Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -7117,6 +7117,12 @@
},
"sqlState" : "42809"
},
"UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION" : {
"message" : [
"INSERT WITH SCHEMA EVOLUTION ... is unsupported for this table format."
],
"sqlState" : "0A000"
},
"UNSUPPORTED_JOIN_TYPE" : {
"message" : [
"Unsupported join type '<typ>'. Supported join types include: <supported>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,9 @@ query
;

insertInto
: INSERT OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT INTO TABLE? identifierReference optionsClause? (BY NAME)? REPLACE whereClause #insertIntoReplaceWhere
: INSERT (WITH SCHEMA EVOLUTION)? OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable
| INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable
| INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference optionsClause? (BY NAME)? REPLACE whereClause #insertIntoReplaceWhere
| INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir
| INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@ class Analyzer(

def apply(plan: LogicalPlan)
: LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
case i @ InsertIntoStatement(table, _, _, _, _, _, _) =>
case i @ InsertIntoStatement(table, _, _, _, _, _, _, _) =>
val relation = table match {
case u: UnresolvedRelation if !u.isStreaming =>
resolveRelation(u).getOrElse(u)
Expand Down Expand Up @@ -1231,7 +1231,7 @@ class Analyzer(
object ResolveInsertInto extends ResolveInsertionBase {
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
AlwaysProcess.fn, ruleId) {
case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _)
case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _, _)
if i.query.resolved =>
// ifPartitionNotExists is append with validation, but validation is not supported
if (i.ifPartitionNotExists) {
Expand All @@ -1249,27 +1249,50 @@ class Analyzer(
val partCols = partitionColumnNames(r.table)
validatePartitionSpec(partCols, i.partitionSpec)

val schemaEvolutionWriteOption: Map[String, String] =
if (i.withSchemaEvolution) Map("mergeSchema" -> "true") else Map.empty
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not add a new bool field to AppendData, like what we did for InsertIntoStatement? The MergeIntoTable also has a withSchemaEvolution flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @cloud-fan, thank you very much for your review! We also raised this point and this was our discussion on it

#53732 (comment)


val staticPartitions = i.partitionSpec.filter(_._2.isDefined).transform((_, v) => v.get)
val query = addStaticPartitionColumns(r, projectByName.getOrElse(i.query), staticPartitions,
isByName)

if (!i.overwrite) {
if (isByName) {
AppendData.byName(r, query)
AppendData.byName(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not properly model the schemaEvolution flag in AppendData/ etc?

Copy link
Contributor Author

@longvu-db longvu-db Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho You mean we should add a flag schemaEvolution in place of writeOptions = schemaEvolutionWriteOption in v2 Write nodes, and add the schemaEvolutionWriteOption in https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala#L61 if the schemaEvolution flag is enabled? Do you have an example?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a dedicated flag would be cleaner: mergeSchema is fairly overloaded, e.g. it can also be a read option in Parquet that means inferring a superset schema from multiple Parquet files being read.

But: dataframe operations have always been using mergeSchema to enable schema evolution in Delta and Iceberg. By reusing the mergeSchema option, we automatically get schema evolution working there.
If we introduce a new field, then until Delta/Iceberg pick it up, WITH SCHEMA EVOLUTION will essentially be ignored - not good.

I would use mergeSchema for now, we can still introduce a dedicated field later if we want to

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks

r,
query,
writeOptions = schemaEvolutionWriteOption)
} else {
AppendData.byPosition(r, query)
AppendData.byPosition(
r,
query,
writeOptions = schemaEvolutionWriteOption)
}
} else if (conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC) {
if (isByName) {
OverwritePartitionsDynamic.byName(r, query)
OverwritePartitionsDynamic.byName(
r,
query,
writeOptions = schemaEvolutionWriteOption)
} else {
OverwritePartitionsDynamic.byPosition(r, query)
OverwritePartitionsDynamic.byPosition(
r,
query,
writeOptions = schemaEvolutionWriteOption)
}
} else {
if (isByName) {
OverwriteByExpression.byName(r, query, staticDeleteExpression(r, staticPartitions))
OverwriteByExpression.byName(
table = r,
df = query,
deleteExpr = staticDeleteExpression(r, staticPartitions),
writeOptions = schemaEvolutionWriteOption)
} else {
OverwriteByExpression.byPosition(r, query, staticDeleteExpression(r, staticPartitions))
OverwriteByExpression.byPosition(
table = r,
query = query,
deleteExpr = staticDeleteExpression(r, staticPartitions),
writeOptions = schemaEvolutionWriteOption)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString
// not found first, instead of errors in the input query of the insert command, by doing a
// top-down traversal.
plan.foreach {
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _) =>
case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _, _) =>
u.tableNotFound(u.multipartIdentifier)

// TODO (SPARK-27484): handle streaming write commands when we have them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -875,9 +875,12 @@ class AstBuilder extends DataTypeAstBuilder
/**
* Add an
* {{{
* INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? [identifierList]
* INSERT INTO [TABLE] tableIdentifier [partitionSpec] ([BY NAME] | [identifierList])
* INSERT INTO [TABLE] tableIdentifier REPLACE whereClause
* INSERT [WITH SCHEMA EVOLUTION] OVERWRITE
* TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? [identifierList]
* INSERT [WITH SCHEMA EVOLUTION] INTO
* [TABLE] tableIdentifier [partitionSpec] ([BY NAME] | [identifierList])
* INSERT [WITH SCHEMA EVOLUTION] INTO
* [TABLE] tableIdentifier REPLACE whereClause
* INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat]
* INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS tablePropertyList]
* }}}
Expand Down Expand Up @@ -906,7 +909,8 @@ class AstBuilder extends DataTypeAstBuilder
query = otherPlans.head,
overwrite = false,
ifPartitionNotExists = insertParams.ifPartitionNotExists,
byName = insertParams.byName)
byName = insertParams.byName,
withSchemaEvolution = table.EVOLUTION() != null)
})
case table: InsertOverwriteTableContext =>
val insertParams = visitInsertOverwriteTable(table)
Expand All @@ -923,7 +927,8 @@ class AstBuilder extends DataTypeAstBuilder
query = otherPlans.head,
overwrite = true,
ifPartitionNotExists = insertParams.ifPartitionNotExists,
byName = insertParams.byName)
byName = insertParams.byName,
withSchemaEvolution = table.EVOLUTION() != null)
})
case ctx: InsertIntoReplaceWhereContext =>
val options = Option(ctx.optionsClause())
Expand All @@ -932,10 +937,20 @@ class AstBuilder extends DataTypeAstBuilder
Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), isStreaming = false)
val deleteExpr = expression(ctx.whereClause().booleanExpression())
val isByName = ctx.NAME() != null
val schemaEvolutionWriteOption: Map[String, String] =
if (ctx.EVOLUTION() != null) Map("mergeSchema" -> "true") else Map.empty
if (isByName) {
OverwriteByExpression.byName(table, otherPlans.head, deleteExpr)
OverwriteByExpression.byName(
table,
df = otherPlans.head,
deleteExpr,
writeOptions = schemaEvolutionWriteOption)
} else {
OverwriteByExpression.byPosition(table, otherPlans.head, deleteExpr)
OverwriteByExpression.byPosition(
table,
query = otherPlans.head,
deleteExpr,
writeOptions = schemaEvolutionWriteOption)
}
})
case dir: InsertOverwriteDirContext =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ case class QualifiedColType(
* Only valid for static partitions.
* @param byName If true, reorder the data columns to match the column names of the
* target table.
* @param withSchemaEvolution If true, enables automatic schema evolution for the operation.
*/
case class InsertIntoStatement(
table: LogicalPlan,
Expand All @@ -181,7 +182,8 @@ case class InsertIntoStatement(
query: LogicalPlan,
overwrite: Boolean,
ifPartitionNotExists: Boolean,
byName: Boolean = false) extends UnaryParsedStatement {
byName: Boolean = false,
withSchemaEvolution: Boolean = false) extends UnaryParsedStatement {

require(overwrite || !ifPartitionNotExists,
"IF NOT EXISTS is only valid in INSERT OVERWRITE")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
origin = t.origin)
}

def unsupportedInsertWithSchemaEvolution(): Throwable = {
new AnalysisException(
errorClass = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION",
messageParameters = Map.empty)
}

def writeIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = {
new AnalysisException(
errorClass = "VIEW_WRITE_NOT_ALLOWED",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1790,6 +1790,90 @@ class DDLParserSuite extends AnalysisTest {
Literal(5))))
}

for {
isByName <- Seq(true, false)
userSpecifiedCols <- if (!isByName) {
Seq(Seq("a", "b"), Seq.empty)
} else {
Seq(Seq.empty)
}
} {
val byNameClause = if (isByName) "BY NAME " else ""
val sourceQuery = "SELECT * FROM source"
val userSpecifiedColsClause =
if (userSpecifiedCols.isEmpty) "" else userSpecifiedCols.mkString("(", ", ", ")")
val testMsg = s"isByName=$isByName, userSpecifiedColsClause=$userSpecifiedColsClause"

test(s"INSERT INTO with WITH SCHEMA EVOLUTION - $testMsg") {
val table = "testcat.ns1.ns2.tbl"
val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION INTO $table " +
s"${userSpecifiedColsClause}${byNameClause}${sourceQuery}"

parseCompare(
sql = insertSQLStmt,
expected = InsertIntoStatement(
table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
partitionSpec = Map.empty,
userSpecifiedCols = userSpecifiedCols,
query = Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
overwrite = false,
ifPartitionNotExists = false,
byName = isByName,
withSchemaEvolution = true)
)
}

test(s"INSERT OVERWRITE (static) with WITH SCHEMA EVOLUTION - $testMsg") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
SQLConf.PartitionOverwriteMode.STATIC.toString) {
val table = "testcat.ns1.ns2.tbl"
val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION OVERWRITE $table " +
s"${userSpecifiedColsClause}${byNameClause}${sourceQuery}"

parseCompare(
sql = insertSQLStmt,
expected = InsertIntoStatement(
table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
partitionSpec = Map.empty,
userSpecifiedCols = userSpecifiedCols,
query = Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
overwrite = true,
ifPartitionNotExists = false,
byName = isByName,
withSchemaEvolution = true)
)
}
}
}

for (isByName <- Seq(true, false)) {
val byNameClause = if (isByName) "BY NAME " else ""
val sourceQuery = "SELECT * FROM source"
val testMsg = s"isByName=$isByName"

test(s"INSERT OVERWRITE (dynamic) with WITH SCHEMA EVOLUTION - $testMsg") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key ->
SQLConf.PartitionOverwriteMode.DYNAMIC.toString) {
val table = "testcat.ns1.ns2.tbl"
val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION OVERWRITE $table " +
s"${byNameClause}${sourceQuery}"

parseCompare(
sql = insertSQLStmt,
expected = InsertIntoStatement(
table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
partitionSpec = Map.empty,
userSpecifiedCols = Seq.empty,
query = Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))),
overwrite = true,
ifPartitionNotExists = false,
byName = isByName,
withSchemaEvolution = true)
)
}
}
}

test("delete from table: delete all") {
parseCompare("DELETE FROM testcat.ns1.ns2.tbl",
DeleteFromTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output.map(_.name))

case InsertIntoStatement(l @ LogicalRelationWithTable(_: InsertableRelation, _),
parts, _, query, overwrite, false, _) if parts.isEmpty =>
parts, _, query, overwrite, false, _, _)
if parts.isEmpty =>
InsertIntoDataSourceCommand(l, query, overwrite)

case InsertIntoDir(_, storage, provider, query, overwrite)
Expand All @@ -173,8 +174,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {

InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)

case i @ InsertIntoStatement(
l @ LogicalRelationWithTable(t: HadoopFsRelation, table), parts, _, query, overwrite, _, _)
case i @ InsertIntoStatement(l @ LogicalRelationWithTable(t: HadoopFsRelation, table),
parts, _, query, overwrite, _, _, _)
if query.resolved =>
// If the InsertIntoTable command is for a partitioned HadoopFsRelation and
// the user has specified static partitions, we add a Project operator on top of the query
Expand Down Expand Up @@ -307,11 +308,11 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]

override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false),
_, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
_, _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
i.copy(table = readDataSourceTable(tableMeta, options))

case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false),
_, _, _, _, _, _) =>
_, _, _, _, _, _, _) =>
i.copy(table = DDLUtils.readHiveTable(tableMeta))

case append @ AppendData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table, FileTable}
class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoStatement(
d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _) =>
d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _, _) =>
val v1FileFormat = table.fallbackFileFormat.getDeclaredConstructor().newInstance()
val relation = HadoopFsRelation(
table.fileIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,10 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
partColNames: StructType,
catalogTable: Option[CatalogTable]): InsertIntoStatement = {

if (insert.withSchemaEvolution) {
throw QueryCompilationErrors.unsupportedInsertWithSchemaEvolution()
}

val normalizedPartSpec = normalizePartitionSpec(
insert.partitionSpec, partColNames, tblName, conf.resolver)

Expand Down Expand Up @@ -526,7 +530,8 @@ object PreprocessTableInsertion extends ResolveInsertionBase {
}

def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case i @ InsertIntoStatement(table, _, _, query, _, _, _) if table.resolved && query.resolved =>
case i @ InsertIntoStatement(table, _, _, query, _, _, _, _)
if table.resolved && query.resolved =>
table match {
case relation: HiveTableRelation =>
val metadata = relation.tableMeta
Expand Down Expand Up @@ -606,7 +611,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
def apply(plan: LogicalPlan): Unit = {
plan.foreach {
case InsertIntoStatement(LogicalRelationWithTable(relation, _), partition,
_, query, _, _, _) =>
_, query, _, _, _, _) =>
// Get all input data source relations of the query.
val srcRelations = query.collect {
case l: LogicalRelation => l.relation
Expand Down Expand Up @@ -635,7 +640,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
messageParameters = Map("relationId" -> toSQLId(relation.toString)))
}

case InsertIntoStatement(t, _, _, _, _, _, _)
case InsertIntoStatement(t, _, _, _, _, _, _, _)
if !t.isInstanceOf[LeafNode] ||
t.isInstanceOf[Range] ||
t.isInstanceOf[OneRowRelation] ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ ExplainCommand 'Aggregate ['key], ['key, unresolvedalias('MIN('val))], Formatted
-- !query
EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
-- !query analysis
ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, ExtendedMode
ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, false, ExtendedMode


-- !query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ ExplainCommand 'Aggregate ['key], ['key, unresolvedalias('MIN('val))], Formatted
-- !query
EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
-- !query analysis
ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, ExtendedMode
ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, false, ExtendedMode


-- !query
Expand Down
Loading