diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 1979fbffe3612..f34234c822976 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -7117,6 +7117,12 @@ }, "sqlState" : "42809" }, + "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION" : { + "message" : [ + "INSERT WITH SCHEMA EVOLUTION ... is unsupported for this table format." + ], + "sqlState" : "0A000" + }, "UNSUPPORTED_JOIN_TYPE" : { "message" : [ "Unsupported join type ''. Supported join types include: ." diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 05662cd0beefc..060c91120d184 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -560,9 +560,9 @@ query ; insertInto - : INSERT OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable - | INSERT INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable - | INSERT INTO TABLE? identifierReference optionsClause? (BY NAME)? REPLACE whereClause #insertIntoReplaceWhere + : INSERT (WITH SCHEMA EVOLUTION)? OVERWRITE TABLE? identifierReference optionsClause? (partitionSpec (IF errorCapturingNot EXISTS)?)? ((BY NAME) | identifierList)? #insertOverwriteTable + | INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference optionsClause? partitionSpec? (IF errorCapturingNot EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable + | INSERT (WITH SCHEMA EVOLUTION)? INTO TABLE? identifierReference optionsClause? (BY NAME)? REPLACE whereClause #insertIntoReplaceWhere | INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir | INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 23ee8bc7c9eb4..6dc397a2960ab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1096,7 +1096,7 @@ class Analyzer( def apply(plan: LogicalPlan) : LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) { - case i @ InsertIntoStatement(table, _, _, _, _, _, _) => + case i @ InsertIntoStatement(table, _, _, _, _, _, _, _) => val relation = table match { case u: UnresolvedRelation if !u.isStreaming => resolveRelation(u).getOrElse(u) @@ -1231,7 +1231,7 @@ class Analyzer( object ResolveInsertInto extends ResolveInsertionBase { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( AlwaysProcess.fn, ruleId) { - case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _) + case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _, _) if i.query.resolved => // ifPartitionNotExists is append with validation, but validation is not supported if (i.ifPartitionNotExists) { @@ -1249,27 +1249,50 @@ class Analyzer( val partCols = partitionColumnNames(r.table) validatePartitionSpec(partCols, i.partitionSpec) + val schemaEvolutionWriteOption: Map[String, String] = + if (i.withSchemaEvolution) Map("mergeSchema" -> "true") else Map.empty + val staticPartitions = i.partitionSpec.filter(_._2.isDefined).transform((_, v) => v.get) val query = addStaticPartitionColumns(r, projectByName.getOrElse(i.query), staticPartitions, isByName) if (!i.overwrite) { if (isByName) { - AppendData.byName(r, query) + AppendData.byName( + r, + query, + writeOptions = schemaEvolutionWriteOption) } else { - AppendData.byPosition(r, query) + AppendData.byPosition( + r, + query, + writeOptions = schemaEvolutionWriteOption) } } else if (conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC) { if (isByName) { - OverwritePartitionsDynamic.byName(r, query) + OverwritePartitionsDynamic.byName( + r, + query, + writeOptions = schemaEvolutionWriteOption) } else { - OverwritePartitionsDynamic.byPosition(r, query) + OverwritePartitionsDynamic.byPosition( + r, + query, + writeOptions = schemaEvolutionWriteOption) } } else { if (isByName) { - OverwriteByExpression.byName(r, query, staticDeleteExpression(r, staticPartitions)) + OverwriteByExpression.byName( + table = r, + df = query, + deleteExpr = staticDeleteExpression(r, staticPartitions), + writeOptions = schemaEvolutionWriteOption) } else { - OverwriteByExpression.byPosition(r, query, staticDeleteExpression(r, staticPartitions)) + OverwriteByExpression.byPosition( + table = r, + query = query, + deleteExpr = staticDeleteExpression(r, staticPartitions), + writeOptions = schemaEvolutionWriteOption) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index c34ae507e7583..02e3e8d4012a3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -254,7 +254,7 @@ trait CheckAnalysis extends LookupCatalog with QueryErrorsBase with PlanToString // not found first, instead of errors in the input query of the insert command, by doing a // top-down traversal. plan.foreach { - case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _) => + case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _, _) => u.tableNotFound(u.multipartIdentifier) // TODO (SPARK-27484): handle streaming write commands when we have them. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 780a06060341a..96d379c771072 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -875,9 +875,12 @@ class AstBuilder extends DataTypeAstBuilder /** * Add an * {{{ - * INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? [identifierList] - * INSERT INTO [TABLE] tableIdentifier [partitionSpec] ([BY NAME] | [identifierList]) - * INSERT INTO [TABLE] tableIdentifier REPLACE whereClause + * INSERT [WITH SCHEMA EVOLUTION] OVERWRITE + * TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? [identifierList] + * INSERT [WITH SCHEMA EVOLUTION] INTO + * [TABLE] tableIdentifier [partitionSpec] ([BY NAME] | [identifierList]) + * INSERT [WITH SCHEMA EVOLUTION] INTO + * [TABLE] tableIdentifier REPLACE whereClause * INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat] * INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS tablePropertyList] * }}} @@ -906,7 +909,8 @@ class AstBuilder extends DataTypeAstBuilder query = otherPlans.head, overwrite = false, ifPartitionNotExists = insertParams.ifPartitionNotExists, - byName = insertParams.byName) + byName = insertParams.byName, + withSchemaEvolution = table.EVOLUTION() != null) }) case table: InsertOverwriteTableContext => val insertParams = visitInsertOverwriteTable(table) @@ -923,7 +927,8 @@ class AstBuilder extends DataTypeAstBuilder query = otherPlans.head, overwrite = true, ifPartitionNotExists = insertParams.ifPartitionNotExists, - byName = insertParams.byName) + byName = insertParams.byName, + withSchemaEvolution = table.EVOLUTION() != null) }) case ctx: InsertIntoReplaceWhereContext => val options = Option(ctx.optionsClause()) @@ -932,10 +937,20 @@ class AstBuilder extends DataTypeAstBuilder Seq(TableWritePrivilege.INSERT, TableWritePrivilege.DELETE), isStreaming = false) val deleteExpr = expression(ctx.whereClause().booleanExpression()) val isByName = ctx.NAME() != null + val schemaEvolutionWriteOption: Map[String, String] = + if (ctx.EVOLUTION() != null) Map("mergeSchema" -> "true") else Map.empty if (isByName) { - OverwriteByExpression.byName(table, otherPlans.head, deleteExpr) + OverwriteByExpression.byName( + table, + df = otherPlans.head, + deleteExpr, + writeOptions = schemaEvolutionWriteOption) } else { - OverwriteByExpression.byPosition(table, otherPlans.head, deleteExpr) + OverwriteByExpression.byPosition( + table, + query = otherPlans.head, + deleteExpr, + writeOptions = schemaEvolutionWriteOption) } }) case dir: InsertOverwriteDirContext => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index a9e0650010d48..0930cf17a1db9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -173,6 +173,7 @@ case class QualifiedColType( * Only valid for static partitions. * @param byName If true, reorder the data columns to match the column names of the * target table. + * @param withSchemaEvolution If true, enables automatic schema evolution for the operation. */ case class InsertIntoStatement( table: LogicalPlan, @@ -181,7 +182,8 @@ case class InsertIntoStatement( query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean, - byName: Boolean = false) extends UnaryParsedStatement { + byName: Boolean = false, + withSchemaEvolution: Boolean = false) extends UnaryParsedStatement { require(overwrite || !ifPartitionNotExists, "IF NOT EXISTS is only valid in INSERT OVERWRITE") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 12133849712f1..587ee281e1f67 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -576,6 +576,12 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat origin = t.origin) } + def unsupportedInsertWithSchemaEvolution(): Throwable = { + new AnalysisException( + errorClass = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION", + messageParameters = Map.empty) + } + def writeIntoViewNotAllowedError(identifier: TableIdentifier, t: TreeNode[_]): Throwable = { new AnalysisException( errorClass = "VIEW_WRITE_NOT_ALLOWED", diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 9e2def3072abb..bda23c665cb92 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1790,6 +1790,90 @@ class DDLParserSuite extends AnalysisTest { Literal(5)))) } + for { + isByName <- Seq(true, false) + userSpecifiedCols <- if (!isByName) { + Seq(Seq("a", "b"), Seq.empty) + } else { + Seq(Seq.empty) + } + } { + val byNameClause = if (isByName) "BY NAME " else "" + val sourceQuery = "SELECT * FROM source" + val userSpecifiedColsClause = + if (userSpecifiedCols.isEmpty) "" else userSpecifiedCols.mkString("(", ", ", ")") + val testMsg = s"isByName=$isByName, userSpecifiedColsClause=$userSpecifiedColsClause" + + test(s"INSERT INTO with WITH SCHEMA EVOLUTION - $testMsg") { + val table = "testcat.ns1.ns2.tbl" + val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION INTO $table " + + s"${userSpecifiedColsClause}${byNameClause}${sourceQuery}" + + parseCompare( + sql = insertSQLStmt, + expected = InsertIntoStatement( + table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")), + partitionSpec = Map.empty, + userSpecifiedCols = userSpecifiedCols, + query = Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))), + overwrite = false, + ifPartitionNotExists = false, + byName = isByName, + withSchemaEvolution = true) + ) + } + + test(s"INSERT OVERWRITE (static) with WITH SCHEMA EVOLUTION - $testMsg") { + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> + SQLConf.PartitionOverwriteMode.STATIC.toString) { + val table = "testcat.ns1.ns2.tbl" + val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION OVERWRITE $table " + + s"${userSpecifiedColsClause}${byNameClause}${sourceQuery}" + + parseCompare( + sql = insertSQLStmt, + expected = InsertIntoStatement( + table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")), + partitionSpec = Map.empty, + userSpecifiedCols = userSpecifiedCols, + query = Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))), + overwrite = true, + ifPartitionNotExists = false, + byName = isByName, + withSchemaEvolution = true) + ) + } + } + } + + for (isByName <- Seq(true, false)) { + val byNameClause = if (isByName) "BY NAME " else "" + val sourceQuery = "SELECT * FROM source" + val testMsg = s"isByName=$isByName" + + test(s"INSERT OVERWRITE (dynamic) with WITH SCHEMA EVOLUTION - $testMsg") { + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> + SQLConf.PartitionOverwriteMode.DYNAMIC.toString) { + val table = "testcat.ns1.ns2.tbl" + val insertSQLStmt = s"INSERT WITH SCHEMA EVOLUTION OVERWRITE $table " + + s"${byNameClause}${sourceQuery}" + + parseCompare( + sql = insertSQLStmt, + expected = InsertIntoStatement( + table = UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")), + partitionSpec = Map.empty, + userSpecifiedCols = Seq.empty, + query = Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))), + overwrite = true, + ifPartitionNotExists = false, + byName = isByName, + withSchemaEvolution = true) + ) + } + } + } + test("delete from table: delete all") { parseCompare("DELETE FROM testcat.ns1.ns2.tbl", DeleteFromTable( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 539bcb6c29e89..18707c5b24670 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -162,7 +162,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output.map(_.name)) case InsertIntoStatement(l @ LogicalRelationWithTable(_: InsertableRelation, _), - parts, _, query, overwrite, false, _) if parts.isEmpty => + parts, _, query, overwrite, false, _, _) + if parts.isEmpty => InsertIntoDataSourceCommand(l, query, overwrite) case InsertIntoDir(_, storage, provider, query, overwrite) @@ -173,8 +174,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite) - case i @ InsertIntoStatement( - l @ LogicalRelationWithTable(t: HadoopFsRelation, table), parts, _, query, overwrite, _, _) + case i @ InsertIntoStatement(l @ LogicalRelationWithTable(t: HadoopFsRelation, table), + parts, _, query, overwrite, _, _, _) if query.resolved => // If the InsertIntoTable command is for a partitioned HadoopFsRelation and // the user has specified static partitions, we add a Project operator on top of the query @@ -307,11 +308,11 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), - _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => + _, _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => i.copy(table = readDataSourceTable(tableMeta, options)) case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), - _, _, _, _, _, _) => + _, _, _, _, _, _, _) => i.copy(table = DDLUtils.readHiveTable(tableMeta)) case append @ AppendData( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala index 60c459ecf5407..e03d6e6772fa1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.{ExtractV2Table, FileTable} class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoStatement( - d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _) => + d @ ExtractV2Table(table: FileTable), _, _, _, _, _, _, _) => val v1FileFormat = table.fallbackFileFormat.getDeclaredConstructor().newInstance() val relation = HadoopFsRelation( table.fileIndex, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index b8e0627dfc43a..f097e1aa63799 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -462,6 +462,10 @@ object PreprocessTableInsertion extends ResolveInsertionBase { partColNames: StructType, catalogTable: Option[CatalogTable]): InsertIntoStatement = { + if (insert.withSchemaEvolution) { + throw QueryCompilationErrors.unsupportedInsertWithSchemaEvolution() + } + val normalizedPartSpec = normalizePartitionSpec( insert.partitionSpec, partColNames, tblName, conf.resolver) @@ -526,7 +530,8 @@ object PreprocessTableInsertion extends ResolveInsertionBase { } def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoStatement(table, _, _, query, _, _, _) if table.resolved && query.resolved => + case i @ InsertIntoStatement(table, _, _, query, _, _, _, _) + if table.resolved && query.resolved => table match { case relation: HiveTableRelation => val metadata = relation.tableMeta @@ -606,7 +611,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { plan.foreach { case InsertIntoStatement(LogicalRelationWithTable(relation, _), partition, - _, query, _, _, _) => + _, query, _, _, _, _) => // Get all input data source relations of the query. val srcRelations = query.collect { case l: LogicalRelation => l.relation @@ -635,7 +640,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) { messageParameters = Map("relationId" -> toSQLId(relation.toString))) } - case InsertIntoStatement(t, _, _, _, _, _, _) + case InsertIntoStatement(t, _, _, _, _, _, _, _) if !t.isInstanceOf[LeafNode] || t.isInstanceOf[Range] || t.isInstanceOf[OneRowRelation] || diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out index 89ba8971a60d1..4b9bb859cd567 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out @@ -208,7 +208,7 @@ ExplainCommand 'Aggregate ['key], ['key, unresolvedalias('MIN('val))], Formatted -- !query EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4 -- !query analysis -ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, ExtendedMode +ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, false, ExtendedMode -- !query diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out index 89ba8971a60d1..4b9bb859cd567 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out @@ -208,7 +208,7 @@ ExplainCommand 'Aggregate ['key], ['key, unresolvedalias('MIN('val))], Formatted -- !query EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4 -- !query analysis -ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, ExtendedMode +ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, false, ExtendedMode -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index f2df635a5a4ff..be22c74f43b05 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -1183,7 +1183,7 @@ EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4 struct -- !query output == Parsed Logical Plan == -'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false +'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, false +- 'Project [*] +- 'UnresolvedRelation [explain_temp4], [], false diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index 221a323b01bb2..e6db39f7913c6 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -1075,7 +1075,7 @@ EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4 struct -- !query output == Parsed Logical Plan == -'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false +'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [__required_write_privileges__=INSERT], false, false, false, false, false +- 'Project [*] +- 'UnresolvedRelation [explain_temp4], [], false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 8e5ee1644f9c4..65028eb1777e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -1249,7 +1249,7 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { case InsertIntoStatement( _, _, _, UnresolvedInlineTable(_, Seq(Seq(UnresolvedAttribute(Seq("DEFAULT"))))), - _, _, _) => + _, _, _, _) => case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString) } @@ -1257,7 +1257,7 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { case InsertIntoStatement( _, _, _, Project(Seq(UnresolvedAttribute(Seq("DEFAULT"))), _), - _, _, _) => + _, _, _, _) => case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString) } @@ -1325,6 +1325,79 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { } } + for { + withSchemaEvolution <- Seq(true, false) + isByName <- Seq(true, false) + } { + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION " else "" + val byNameClause = if (isByName) "BY NAME " else "" + val testMsg = s"withSchemaEvolution=$withSchemaEvolution, isByName=$isByName" + + test(s"INSERT INTO: mergeSchema write option with WITH SCHEMA EVOLUTION - $testMsg") { + val table = "testcat.tab" + val insertSQLStmt = s"INSERT ${schemaEvolutionClause}INTO $table ${byNameClause}" + + val writeOptions = parseAndResolve(s"$insertSQLStmt SELECT * FROM v2Table") match { + case appendData: AppendData => + appendData.writeOptions + case other => + fail(s"Expected AppendData, but got: ${other.getClass.getSimpleName}") + } + assert(writeOptions.get("mergeSchema") === + (if (withSchemaEvolution) Some("true") else None)) + } + + test(s"INSERT OVERWRITE (static): mergeSchema write option with WITH SCHEMA EVOLUTION - " + + testMsg) { + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString) { + val table = "testcat.tab" + val insertSQLStmt = s"INSERT ${schemaEvolutionClause}OVERWRITE $table ${byNameClause}" + + val writeOptions = parseAndResolve(s"$insertSQLStmt SELECT * FROM v2Table") match { + case overwriteByExpression: OverwriteByExpression => + overwriteByExpression.writeOptions + case other => + fail(s"Expected OverwriteByExpression, but got: ${other.getClass.getSimpleName}") + } + assert(writeOptions.get("mergeSchema") === + (if (withSchemaEvolution) Some("true") else None)) + } + } + + test(s"INSERT OVERWRITE (dynamic): mergeSchema write option with WITH SCHEMA EVOLUTION - " + + testMsg) { + withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> + PartitionOverwriteMode.DYNAMIC.toString) { + val table = "testcat.tab" + val insertSQLStmt = s"INSERT ${schemaEvolutionClause}OVERWRITE $table ${byNameClause}" + + val writeOptions = parseAndResolve(s"$insertSQLStmt SELECT * FROM v2Table") match { + case overwritePartitionsDynamic: OverwritePartitionsDynamic => + overwritePartitionsDynamic.writeOptions + case other => + fail(s"Expected OverwritePartitionsDynamic, but got: ${other.getClass.getSimpleName}") + } + assert(writeOptions.get("mergeSchema") === + (if (withSchemaEvolution) Some("true") else None)) + } + } + + test(s"REPLACE WHERE: mergeSchema write option with WITH SCHEMA EVOLUTION - $testMsg") { + val table = "testcat.tab" + val insertSQLStmt = + s"INSERT ${schemaEvolutionClause}INTO $table ${byNameClause}REPLACE WHERE i = 1" + + val writeOptions = parseAndResolve(s"$insertSQLStmt SELECT * FROM v2Table") match { + case overwriteByExpression: OverwriteByExpression => + overwriteByExpression.writeOptions + case other => + fail(s"Expected OverwriteByExpression, but got: ${other.getClass.getSimpleName}") + } + assert(writeOptions.get("mergeSchema") === + (if (withSchemaEvolution) Some("true") else None)) + } + } + test("alter table: alter column") { Seq("v1Table" -> true, "v2Table" -> false, "testcat.tab" -> false).foreach { case (tblName, useV1Command) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 3cd289f7c6d86..d92e796455714 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -960,6 +960,43 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } } + test("SPARK-54971: INSERT WITH SCHEMA EVOLUTION is currently unsupported") { + withTable("test_table") { + val schema = new StructType() + .add("i", LongType, false) + .add("s", StringType, false) + val newTable = CatalogTable( + identifier = TableIdentifier("test_table", None), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat( + locationUri = None, + inputFormat = None, + outputFormat = None, + serde = None, + compressed = false, + properties = Map.empty), + schema = schema, + provider = Some(classOf[SimpleInsertSource].getName)) + + spark.sessionState.catalog.createTable(newTable, false) + + sql("INSERT INTO TABLE test_table SELECT 1, 'a'") + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE test_table SELECT 1, 'a'") + }, + condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION" + ) + + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE test_table SELECT 1, 'a', 2") + }, + condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION" + ) + } + } + test("Allow user to insert specified columns into insertable view") { sql("INSERT OVERWRITE TABLE jsonTable SELECT a, DEFAULT FROM jt") checkAnswer( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 07d9df59b86d8..28cb8d9e20408 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -153,7 +153,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { // handles InsertIntoStatement specially as the table in InsertIntoStatement is not added in its // children, hence not matched directly by previous HiveTableRelation case. - case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, _) + case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, _, _) if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => i.copy(table = hiveTableWithStats(relation)) } @@ -168,7 +168,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case InsertIntoStatement( - r: HiveTableRelation, partSpec, _, query, overwrite, ifPartitionNotExists, _) + r: HiveTableRelation, partSpec, _, query, overwrite, ifPartitionNotExists, _, _) if DDLUtils.isHiveTable(r.tableMeta) && query.resolved => InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists, query.output.map(_.name)) @@ -236,13 +236,27 @@ case class RelationConversions( plan resolveOperators { // Write path case InsertIntoStatement( - r: HiveTableRelation, partition, cols, query, overwrite, ifPartitionNotExists, byName) + r: HiveTableRelation, + partition, + cols, + query, + overwrite, + ifPartitionNotExists, + byName, + withSchemaEvolution) if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && ((r.isPartitioned && conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE)) || (!r.isPartitioned && conf.getConf(HiveUtils.CONVERT_INSERTING_UNPARTITIONED_TABLE))) && isConvertible(r) => - InsertIntoStatement(metastoreCatalog.convert(r, isWrite = true), partition, cols, - query, overwrite, ifPartitionNotExists, byName) + InsertIntoStatement( + metastoreCatalog.convert(r, isWrite = true), + partition, + cols, + query, + overwrite, + ifPartitionNotExists, + byName, + withSchemaEvolution) // Read path case relation: HiveTableRelation if doConvertHiveTableRelationForRead(relation) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala index f9c001a1a0777..2e45307d91028 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala @@ -451,6 +451,30 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter } } + testPartitionedTable("SPARK-54971: INSERT WITH SCHEMA EVOLUTION is currently unsupported") { + tableName => + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE $tableName SELECT 25, 26, 27, 28") + }, + condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION" + ) + + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE $tableName SELECT 25, 26, 27, 28, 29") + }, + condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION" + ) + + checkError( + exception = intercept[AnalysisException] { + sql(s"INSERT WITH SCHEMA EVOLUTION INTO TABLE $tableName SELECT 25, 26, 27, (28, 29)") + }, + condition = "UNSUPPORTED_INSERT_WITH_SCHEMA_EVOLUTION" + ) + } + testPartitionedTable("insertInto() should match columns by position and ignore column names") { tableName => withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {