diff --git a/auron-build.sh b/auron-build.sh index d947b79e7..b587bf4f0 100755 --- a/auron-build.sh +++ b/auron-build.sh @@ -30,7 +30,7 @@ # Define constants for supported component versions # ----------------------------------------------------------------------------- SUPPORTED_OS_IMAGES=("centos7" "ubuntu24" "rockylinux8" "debian11" "azurelinux3") -SUPPORTED_SPARK_VERSIONS=("3.0" "3.1" "3.2" "3.3" "3.4" "3.5") +SUPPORTED_SPARK_VERSIONS=("3.0" "3.1" "3.2" "3.3" "3.4" "3.5" "4.0") SUPPORTED_SCALA_VERSIONS=("2.12" "2.13") SUPPORTED_CELEBORN_VERSIONS=("0.5" "0.6") # Currently only one supported version, but kept plural for consistency diff --git a/auron-spark-ui/pom.xml b/auron-spark-ui/pom.xml index 08ba20396..e21d23e1f 100644 --- a/auron-spark-ui/pom.xml +++ b/auron-spark-ui/pom.xml @@ -36,6 +36,12 @@ spark-sql_${scalaVersion} provided + + org.apache.auron + spark-version-annotation-macros_${scalaVersion} + ${project.version} + compile + diff --git a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala index c237557ff..254c892be 100644 --- a/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala +++ b/auron-spark-ui/src/main/scala/org/apache/spark/sql/execution/ui/AuronAllExecutionsPage.scala @@ -16,18 +16,44 @@ */ package org.apache.spark.sql.execution.ui -import javax.servlet.http.HttpServletRequest - import scala.xml.{Node, NodeSeq} import org.apache.spark.internal.Logging import org.apache.spark.ui.{UIUtils, WebUIPage} +import org.apache.auron.sparkver + private[ui] class AuronAllExecutionsPage(parent: AuronSQLTab) extends WebUIPage("") with Logging { private val sqlStore = parent.sqlStore - override def render(request: HttpServletRequest): Seq[Node] = { + @sparkver("3.0 / 3.1 / 3.2 / 3.3 / 3.4 / 3.5") + override def render(request: javax.servlet.http.HttpServletRequest): Seq[Node] = { + val buildInfo = sqlStore.buildInfo() + val infos = + UIUtils.listingTable(propertyHeader, propertyRow, buildInfo.info, fixedWidth = true) + val summary: NodeSeq = +
+
+ +

+ + Auron Build Information +

+
+
+ {infos} +
+
+
+
+ + UIUtils.headerSparkPage(request, "Auron", summary, parent) + } + + @sparkver("4.0") + override def render(request: jakarta.servlet.http.HttpServletRequest): Seq[Node] = { val buildInfo = sqlStore.buildInfo() val infos = UIUtils.listingTable(propertyHeader, propertyRow, buildInfo.info, fixedWidth = true) diff --git a/pom.xml b/pom.xml index 63ebf1873..bdf451ff4 100644 --- a/pom.xml +++ b/pom.xml @@ -759,6 +759,47 @@ + + spark-4.0 + + spark-4.0 + spark-extension-shims-spark4 + 3.2.10 + 3.9.9 + 4.0.1 + + + + + org.apache.maven.plugins + maven-enforcer-plugin + 3.4.1 + + + enforce-java-scala-version + + enforce + + + + + [17,) + Spark 4.0 requires JDK 17 or higher to compile! + + + scalaVersion + ^2.13.* + Spark 4.0 requires Scala 2.13 and is not compatible with Scala 2.12! + + + + + + + + + + jdk-8 diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/InterceptedValidateSparkPlan.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/InterceptedValidateSparkPlan.scala index 95aefa65f..ef38d0045 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/InterceptedValidateSparkPlan.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/InterceptedValidateSparkPlan.scala @@ -23,7 +23,7 @@ import org.apache.auron.sparkver object InterceptedValidateSparkPlan extends Logging { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") def validate(plan: SparkPlan): Unit = { import org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec import org.apache.spark.sql.execution.auron.plan.NativeRenameColumnsBase @@ -76,7 +76,7 @@ object InterceptedValidateSparkPlan extends Logging { throw new UnsupportedOperationException("validate is not supported in spark 3.0.3 or 3.1.3") } - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") private def errorOnInvalidBroadcastQueryStage(plan: SparkPlan): Unit = { import org.apache.spark.sql.execution.adaptive.InvalidAQEPlanException throw InvalidAQEPlanException("Invalid broadcast query stage", plan) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala index cb9492c9c..b6f4f0614 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala @@ -128,8 +128,10 @@ class ShimsImpl extends Shims with Logging { override def shimVersion: String = "spark-3.4" @sparkver("3.5") override def shimVersion: String = "spark-3.5" + @sparkver("4.0") + override def shimVersion: String = "spark-4.0" - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def initExtension(): Unit = { ValidateSparkPlanInjector.inject() @@ -437,7 +439,7 @@ class ShimsImpl extends Shims with Logging { length: Long, numRecords: Long): FileSegment = new FileSegment(file, offset, length) - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def commit( dep: ShuffleDependency[_, _, _], shuffleBlockResolver: IndexShuffleBlockResolver, @@ -598,7 +600,7 @@ class ShimsImpl extends Shims with Logging { expr.asInstanceOf[AggregateExpression].filter } - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") private def isAQEShuffleRead(exec: SparkPlan): Boolean = { import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec exec.isInstanceOf[AQEShuffleReadExec] @@ -610,7 +612,7 @@ class ShimsImpl extends Shims with Logging { exec.isInstanceOf[CustomShuffleReaderExec] } - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") private def executeNativeAQEShuffleReader(exec: SparkPlan): NativeRDD = { import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec import org.apache.spark.sql.execution.CoalescedMapperPartitionSpec @@ -910,7 +912,7 @@ class ShimsImpl extends Shims with Logging { } } - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def getSqlContext(sparkPlan: SparkPlan): SQLContext = sparkPlan.session.sqlContext @@ -932,7 +934,7 @@ class ShimsImpl extends Shims with Logging { size: Long): PartitionedFile = PartitionedFile(partitionValues, filePath, offset, size) - @sparkver("3.4 / 3.5") + @sparkver("3.4 / 3.5 / 4.0") override def getPartitionedFile( partitionValues: InternalRow, filePath: String, @@ -943,7 +945,7 @@ class ShimsImpl extends Shims with Logging { PartitionedFile(partitionValues, SparkPath.fromPath(new Path(filePath)), offset, size) } - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def getMinPartitionNum(sparkSession: SparkSession): Int = sparkSession.sessionState.conf.filesMinPartitionNum .getOrElse(sparkSession.sparkContext.defaultParallelism) @@ -965,13 +967,13 @@ class ShimsImpl extends Shims with Logging { } } - @sparkver("3.4 / 3.5") + @sparkver("3.4 / 3.5 / 4.0") private def convertPromotePrecision( e: Expression, isPruningExpr: Boolean, fallback: Expression => pb.PhysicalExprNode): Option[pb.PhysicalExprNode] = None - @sparkver("3.3 / 3.4 / 3.5") + @sparkver("3.3 / 3.4 / 3.5 / 4.0") private def convertBloomFilterAgg(agg: AggregateFunction): Option[pb.PhysicalAggExprNode] = { import org.apache.spark.sql.catalyst.expressions.aggregate.BloomFilterAggregate agg match { @@ -1000,7 +1002,7 @@ class ShimsImpl extends Shims with Logging { @sparkver("3.0 / 3.1 / 3.2") private def convertBloomFilterAgg(agg: AggregateFunction): Option[pb.PhysicalAggExprNode] = None - @sparkver("3.3 / 3.4 / 3.5") + @sparkver("3.3 / 3.4 / 3.5 / 4.0") private def convertBloomFilterMightContain( e: Expression, isPruningExpr: Boolean, @@ -1034,7 +1036,7 @@ class ShimsImpl extends Shims with Logging { exec.initialPlan } - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def getAdaptiveInputPlan(exec: AdaptiveSparkPlanExec): SparkPlan = { exec.inputPlan } @@ -1064,7 +1066,7 @@ class ShimsImpl extends Shims with Logging { }) } - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def getJoinBuildSide(exec: SparkPlan): JoinBuildSide = { import org.apache.spark.sql.catalyst.optimizer.BuildLeft convertJoinBuildSide( @@ -1079,7 +1081,7 @@ class ShimsImpl extends Shims with Logging { case class ForceNativeExecutionWrapper(override val child: SparkPlan) extends ForceNativeExecutionWrapperBase(child) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) @@ -1094,6 +1096,6 @@ case class NativeExprWrapper( override val nullable: Boolean) extends NativeExprWrapperBase(nativeExpr, dataType, nullable) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy() } diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeExec.scala index 5231bd113..0ef4e8b51 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/ConvertToNativeExec.scala @@ -22,7 +22,7 @@ import org.apache.auron.sparkver case class ConvertToNativeExec(override val child: SparkPlan) extends ConvertToNativeBase(child) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggExec.scala index 7e4b9d6f9..8c836ceba 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeAggExec.scala @@ -48,11 +48,11 @@ case class NativeAggExec( child) with BaseAggregateExec { - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0") override val requiredChildDistributionExpressions: Option[Seq[Expression]] = theRequiredChildDistributionExpressions - @sparkver("3.3 / 3.4 / 3.5") + @sparkver("3.3 / 3.4 / 3.5 / 4.0") override val initialInputBufferOffset: Int = theInitialInputBufferOffset override def output: Seq[Attribute] = @@ -64,15 +64,15 @@ case class NativeAggExec( ExprId.apply(NativeAggBase.AGG_BUF_COLUMN_EXPR_ID)) } - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def isStreaming: Boolean = false - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def numShufflePartitions: Option[Int] = None override def resultExpressions: Seq[NamedExpression] = output - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeExec.scala index f2f43f327..0e86eb018 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeExec.scala @@ -43,7 +43,7 @@ case class NativeBroadcastExchangeExec(mode: BroadcastMode, override val child: relationFuturePromise.future } - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala index ba514ab7d..d281501c3 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeCollectLimitExec.scala @@ -23,7 +23,7 @@ import org.apache.auron.sparkver case class NativeCollectLimitExec(limit: Int, override val child: SparkPlan) extends NativeCollectLimitBase(limit, child) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandExec.scala index ca4aba5e0..e16d62aef 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeExpandExec.scala @@ -28,7 +28,7 @@ case class NativeExpandExec( override val child: SparkPlan) extends NativeExpandBase(projections, output, child) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterExec.scala index 367ad7cc5..61d113bae 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeFilterExec.scala @@ -24,7 +24,7 @@ import org.apache.auron.sparkver case class NativeFilterExec(condition: Expression, override val child: SparkPlan) extends NativeFilterBase(condition, child) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateExec.scala index f0019c669..2ed689aab 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGenerateExec.scala @@ -30,7 +30,7 @@ case class NativeGenerateExec( override val child: SparkPlan) extends NativeGenerateBase(generator, requiredChildOutput, outer, generatorOutput, child) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala index 8aba2de4c..809e1c233 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeGlobalLimitExec.scala @@ -23,7 +23,7 @@ import org.apache.auron.sparkver case class NativeGlobalLimitExec(limit: Long, override val child: SparkPlan) extends NativeGlobalLimitBase(limit, child) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala index ad74b02ec..e12e5be0d 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeLocalLimitExec.scala @@ -23,7 +23,7 @@ import org.apache.auron.sparkver case class NativeLocalLimitExec(limit: Long, override val child: SparkPlan) extends NativeLocalLimitBase(limit, child) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableExec.scala index 4d559b32f..68ac64500 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableExec.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.auron.plan import org.apache.spark.sql.Row -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.auron.Shims import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -70,7 +69,26 @@ case class NativeParquetInsertIntoHiveTableExec( metrics) } - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("4.0") + override protected def getInsertIntoHiveTableCommand( + table: CatalogTable, + partition: Map[String, Option[String]], + query: LogicalPlan, + overwrite: Boolean, + ifPartitionNotExists: Boolean, + outputColumnNames: Seq[String], + metrics: Map[String, SQLMetric]): InsertIntoHiveTable = { + new AuronInsertIntoHiveTable40( + table, + partition, + query, + overwrite, + ifPartitionNotExists, + outputColumnNames, + metrics) + } + + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) @@ -263,7 +281,56 @@ case class NativeParquetInsertIntoHiveTableExec( override lazy val metrics: Map[String, SQLMetric] = outerMetrics - override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { + override def run( + sparkSession: org.apache.spark.sql.SparkSession, + child: SparkPlan): Seq[Row] = { + val nativeParquetSink = + Shims.get.createNativeParquetSinkExec(sparkSession, table, partition, child, metrics) + super.run(sparkSession, nativeParquetSink) + } + } + + class AuronInsertIntoHiveTable40( + table: CatalogTable, + partition: Map[String, Option[String]], + query: LogicalPlan, + overwrite: Boolean, + ifPartitionNotExists: Boolean, + outputColumnNames: Seq[String], + outerMetrics: Map[String, SQLMetric]) + extends { + private val insertIntoHiveTable = InsertIntoHiveTable( + table, + partition, + query, + overwrite, + ifPartitionNotExists, + outputColumnNames) + private val initPartitionColumns = insertIntoHiveTable.partitionColumns + private val initBucketSpec = insertIntoHiveTable.bucketSpec + private val initOptions = insertIntoHiveTable.options + private val initFileFormat = insertIntoHiveTable.fileFormat + private val initHiveTmpPath = insertIntoHiveTable.hiveTmpPath + + } + with InsertIntoHiveTable( + table, + partition, + query, + overwrite, + ifPartitionNotExists, + outputColumnNames, + initPartitionColumns, + initBucketSpec, + initOptions, + initFileFormat, + initHiveTmpPath) { + + override lazy val metrics: Map[String, SQLMetric] = outerMetrics + + override def run( + sparkSession: org.apache.spark.sql.classic.SparkSession, + child: SparkPlan): Seq[Row] = { val nativeParquetSink = Shims.get.createNativeParquetSinkExec(sparkSession, table, partition, child, metrics) super.run(sparkSession, nativeParquetSink) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkExec.scala index 5cbb61eff..736c8b3fd 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetSinkExec.scala @@ -31,7 +31,7 @@ case class NativeParquetSinkExec( override val metrics: Map[String, SQLMetric]) extends NativeParquetSinkBase(sparkSession, table, partition, child, metrics) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala index e243e6f37..63e949da7 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativePartialTakeOrderedExec.scala @@ -29,7 +29,7 @@ case class NativePartialTakeOrderedExec( override val metrics: Map[String, SQLMetric]) extends NativePartialTakeOrderedBase(limit, sortOrder, child, metrics) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectExecProvider.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectExecProvider.scala index 518454003..e2ee17830 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectExecProvider.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeProjectExecProvider.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.auron.sparkver case object NativeProjectExecProvider { - @sparkver("3.4 / 3.5") + @sparkver("3.4 / 3.5 / 4.0") def provide(projectList: Seq[NamedExpression], child: SparkPlan): NativeProjectBase = { import org.apache.spark.sql.execution.OrderPreservingUnaryExecNode import org.apache.spark.sql.execution.PartitioningPreservingUnaryExecNode diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsExecProvider.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsExecProvider.scala index 3ba34ba09..3490a9f50 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsExecProvider.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeRenameColumnsExecProvider.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.auron.sparkver case object NativeRenameColumnsExecProvider { - @sparkver("3.4 / 3.5") + @sparkver("3.4 / 3.5 / 4.0") def provide(child: SparkPlan, renamedColumnNames: Seq[String]): NativeRenameColumnsBase = { import org.apache.spark.sql.catalyst.expressions.NamedExpression import org.apache.spark.sql.catalyst.expressions.SortOrder diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala index 37952b205..b8802d1a2 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeShuffleExchangeExec.scala @@ -124,6 +124,7 @@ case class NativeShuffleExchangeExec( new SQLShuffleWriteMetricsReporter(context.taskMetrics().shuffleWriteMetrics, metrics) } + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") override def write( rdd: RDD[_], dep: ShuffleDependency[_, _, _], @@ -157,13 +158,53 @@ case class NativeShuffleExchangeExec( } writer.stop(true).get } + + @sparkver("4.0") + override def write( + inputs: Iterator[_], + dep: ShuffleDependency[_, _, _], + mapId: Long, + mapIndex: Int, + context: TaskContext): MapStatus = { + + // [SPARK-44605][CORE] Refined the internal ShuffleWriteProcessor API. + // Due to the restructuring of the write method in the API, we optimized and refactored the original Partition. + val rdd = dep.rdd + val partition = rdd.partitions(mapIndex) + + val writer = SparkEnv.get.shuffleManager.getWriter( + dep.shuffleHandle, + mapId, + context, + createMetricsReporter(context)) + + writer match { + case writer: AuronRssShuffleWriterBase[_, _] => + writer.nativeRssShuffleWrite( + rdd.asInstanceOf[MapPartitionsRDD[_, _]].prev.asInstanceOf[NativeRDD], + dep, + mapId.toInt, + context, + partition, + numPartitions) + + case writer: AuronShuffleWriterBase[_, _] => + writer.nativeShuffleWrite( + rdd.asInstanceOf[MapPartitionsRDD[_, _]].prev.asInstanceOf[NativeRDD], + dep, + mapId.toInt, + context, + partition) + } + writer.stop(true).get + } } } // for databricks testing val causedBroadcastJoinBuildOOM = false - @sparkver("3.5") + @sparkver("3.5 / 4.0") override def advisoryPartitionSize: Option[Long] = None // If users specify the num partitions via APIs like `repartition`, we shouldn't change it. @@ -172,17 +213,22 @@ case class NativeShuffleExchangeExec( override def canChangeNumPartitions: Boolean = outputPartitioning != SinglePartition - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def shuffleOrigin = { import org.apache.spark.sql.execution.exchange.ShuffleOrigin; _shuffleOrigin.get.asInstanceOf[ShuffleOrigin] } - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) @sparkver("3.0 / 3.1") override def withNewChildren(newChildren: Seq[SparkPlan]): SparkPlan = copy(child = newChildren.head) + + @sparkver("4.0") + override def shuffleId: Int = { + shuffleDependency.shuffleId; + } } diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortExec.scala index 05cc7236e..26809d116 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeSortExec.scala @@ -27,7 +27,7 @@ case class NativeSortExec( override val child: SparkPlan) extends NativeSortBase(sortOrder, global, child) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala index cec298b64..3b820e88b 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeTakeOrderedExec.scala @@ -27,7 +27,7 @@ case class NativeTakeOrderedExec( override val child: SparkPlan) extends NativeTakeOrderedBase(limit, sortOrder, child) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionExec.scala index 665346e31..130697b78 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeUnionExec.scala @@ -26,7 +26,7 @@ case class NativeUnionExec( override val output: Seq[Attribute]) extends NativeUnionBase(children, output) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildrenInternal(newChildren: IndexedSeq[SparkPlan]): SparkPlan = copy(children = newChildren) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowExec.scala index ac5ff0457..d14647c0b 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeWindowExec.scala @@ -31,7 +31,7 @@ case class NativeWindowExec( override val child: SparkPlan) extends NativeWindowBase(windowExpression, partitionSpec, orderSpec, groupLimit, child) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan = copy(child = newChild) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronBlockStoreShuffleReader.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronBlockStoreShuffleReader.scala index e08d8490e..2ee398ed5 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronBlockStoreShuffleReader.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronBlockStoreShuffleReader.scala @@ -38,7 +38,7 @@ class AuronBlockStoreShuffleReader[K, C]( with Logging { override def readBlocks(): Iterator[InputStream] = { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") def fetchIterator = new ShuffleBlockFetcherIterator( context, blockManager.blockStoreClient, diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronRssShuffleManagerBase.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronRssShuffleManagerBase.scala index 98863bf6a..a0092ef61 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronRssShuffleManagerBase.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronRssShuffleManagerBase.scala @@ -74,7 +74,7 @@ abstract class AuronRssShuffleManagerBase(_conf: SparkConf) extends ShuffleManag context: TaskContext, metrics: ShuffleWriteMetricsReporter): ShuffleWriter[K, V] - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def getReader[K, C]( handle: ShuffleHandle, startMapIndex: Int, diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleManager.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleManager.scala index 4b84de3d5..a6c8903fb 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleManager.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleManager.scala @@ -52,7 +52,7 @@ class AuronShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { sortShuffleManager.registerShuffle(shuffleId, dependency) } - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def getReader[K, C]( handle: ShuffleHandle, startMapIndex: Int, @@ -67,7 +67,7 @@ class AuronShuffleManager(conf: SparkConf) extends ShuffleManager with Logging { @sparkver("3.2") def shuffleMergeFinalized = baseShuffleHandle.dependency.shuffleMergeFinalized - @sparkver("3.3 / 3.4 / 3.5") + @sparkver("3.3 / 3.4 / 3.5 / 4.0") def shuffleMergeFinalized = baseShuffleHandle.dependency.isShuffleMergeFinalizedMarked val (blocksByAddress, canEnableBatchFetch) = diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleWriter.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleWriter.scala index 2ba997109..c94dd2e45 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleWriter.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/AuronShuffleWriter.scala @@ -23,6 +23,6 @@ import org.apache.auron.sparkver class AuronShuffleWriter[K, V](metrics: ShuffleWriteMetricsReporter) extends AuronShuffleWriterBase[K, V](metrics) { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def getPartitionLengths(): Array[Long] = partitionLengths } diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeBroadcastJoinExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeBroadcastJoinExec.scala index fd51bec3d..609908159 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeBroadcastJoinExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeBroadcastJoinExec.scala @@ -48,7 +48,7 @@ case class NativeBroadcastJoinExec( override val condition: Option[Expression] = None - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def buildSide: org.apache.spark.sql.catalyst.optimizer.BuildSide = broadcastSide match { case JoinBuildLeft => org.apache.spark.sql.catalyst.optimizer.BuildLeft @@ -61,7 +61,7 @@ case class NativeBroadcastJoinExec( case JoinBuildRight => org.apache.spark.sql.execution.joins.BuildRight } - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def requiredChildDistribution = { import org.apache.spark.sql.catalyst.plans.physical.BroadcastDistribution import org.apache.spark.sql.catalyst.plans.physical.UnspecifiedDistribution @@ -79,22 +79,22 @@ case class NativeBroadcastJoinExec( override def rewriteKeyExprToLong(exprs: Seq[Expression]): Seq[Expression] = HashJoin.rewriteKeyExpr(exprs) - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def supportCodegen: Boolean = false - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0") override def inputRDDs() = { throw new NotImplementedError("NativeBroadcastJoin dose not support codegen") } - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def prepareRelation( ctx: org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext) : org.apache.spark.sql.execution.joins.HashedRelationInfo = { throw new NotImplementedError("NativeBroadcastJoin dose not support codegen") } - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") override protected def withNewChildrenInternal( newLeft: SparkPlan, newRight: SparkPlan): SparkPlan = diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeShuffledHashJoinExecProvider.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeShuffledHashJoinExecProvider.scala index 0236dd260..614f73e13 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeShuffledHashJoinExecProvider.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeShuffledHashJoinExecProvider.scala @@ -27,7 +27,7 @@ import org.apache.auron.sparkver case object NativeShuffledHashJoinExecProvider { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") def provide( left: SparkPlan, right: SparkPlan, diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeSortMergeJoinExecProvider.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeSortMergeJoinExecProvider.scala index 067ae0715..4c39c423e 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeSortMergeJoinExecProvider.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/execution/joins/auron/plan/NativeSortMergeJoinExecProvider.scala @@ -25,7 +25,7 @@ import org.apache.auron.sparkver case object NativeSortMergeJoinExecProvider { - @sparkver("3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.2 / 3.3 / 3.4 / 3.5 / 4.0") def provide( left: SparkPlan, right: SparkPlan, diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala index c7e2f6a9f..3a511078f 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronFunctionSuite.scala @@ -279,7 +279,7 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite { val dateTimeStampMax = format.parse(dateStringMax).getTime format = new SimpleDateFormat("yyyy-MM-dd") val dateString = "2015-01-01" - val date = format.parse(dateString) + format.parse(dateString) val functions = s""" @@ -321,7 +321,7 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite { val dateTimeStampMax = format.parse(dateStringMax).getTime format = new SimpleDateFormat("yyyy-MM-dd") val dateString = "2015-07-01" - val date = format.parse(dateString) + format.parse(dateString) val functions = s""" diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala index 349b489aa..b8aa2237f 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/auron/AuronQuerySuite.scala @@ -210,7 +210,7 @@ class AuronQuerySuite extends AuronQueryTest with BaseAuronSQLSuite with AuronSQ withTable("t") { sql(s"CREATE EXTERNAL TABLE t(c3 INT, c2 INT) USING ORC LOCATION '$path'") - val expected = if (forcePositionalEvolution) { + if (forcePositionalEvolution) { correctAnswer } else { Seq(Row(null, 2), Row(null, 4), Row(null, 6), Row(null, null)) diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/AuronAdaptiveQueryExecSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/AuronAdaptiveQueryExecSuite.scala index fc4757b8f..7b228ad31 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/AuronAdaptiveQueryExecSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/execution/AuronAdaptiveQueryExecSuite.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.AuronQueryTest import org.apache.auron.{sparkverEnableMembers, BaseAuronSQLSuite} -@sparkverEnableMembers("3.5") +@sparkverEnableMembers("3.5 / 4.0") class AuronAdaptiveQueryExecSuite extends AuronQueryTest with BaseAuronSQLSuite { import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index 4f124bd8f..f1203ddca 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -415,13 +415,13 @@ object AuronConverters extends Logging { getShuffleOrigin(exec)) } - @sparkver(" 3.2 / 3.3 / 3.4 / 3.5") + @sparkver(" 3.2 / 3.3 / 3.4 / 3.5 / 4.0") def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = exec.isSkewJoin @sparkver("3.0 / 3.1") def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = false - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0") def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = Some(exec.shuffleOrigin) @sparkver("3.0") @@ -646,7 +646,7 @@ object AuronConverters extends Logging { } } - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0") def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean = exec.isNullAwareAntiJoin @sparkver("3.0") diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala index 3ae7669ee..403008e63 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala @@ -74,7 +74,7 @@ object NativeHelper extends Logging { val heapMemory = Runtime.getRuntime.maxMemory() val offheapMemory = totalMemory - heapMemory logWarning(s"memory total: ${Utils.bytesToString(totalMemory)}, onheap: ${Utils.bytesToString( - heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}") + heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}") offheapMemory } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala index 8e5d7353f..eecec5ef4 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala @@ -47,7 +47,7 @@ object TaskContextHelper extends Logging { val thread = Thread.currentThread() val threadName = if (context != null) { s"auron native task ${context.partitionId()}.${context.attemptNumber()} in stage ${context - .stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})" + .stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})" } else { "auron native task " + thread.getName } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarArray.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarArray.scala index 988875adc..67a813242 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarArray.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarArray.scala @@ -34,6 +34,8 @@ import org.apache.spark.sql.types.TimestampType import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.unsafe.types.UTF8String +import org.apache.auron.sparkver + class AuronColumnarArray(data: AuronColumnVector, offset: Int, length: Int) extends ArrayData { override def numElements: Int = length @@ -154,4 +156,9 @@ class AuronColumnarArray(data: AuronColumnVector, offset: Int, length: Int) exte override def setNullAt(ordinal: Int): Unit = { throw new UnsupportedOperationException } + + @sparkver("4.0") + override def getVariant(i: Int): org.apache.spark.unsafe.types.VariantVal = { + throw new UnsupportedOperationException + } } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarBatchRow.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarBatchRow.scala index 6b24e0f55..65ee91610 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarBatchRow.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarBatchRow.scala @@ -37,6 +37,8 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.unsafe.types.UTF8String +import org.apache.auron.sparkver + class AuronColumnarBatchRow(columns: Array[AuronColumnVector], var rowId: Int = 0) extends InternalRow { override def numFields: Int = columns.length @@ -133,4 +135,9 @@ class AuronColumnarBatchRow(columns: Array[AuronColumnVector], var rowId: Int = override def setNullAt(ordinal: Int): Unit = { throw new UnsupportedOperationException } + + @sparkver("4.0") + override def getVariant(i: Int): org.apache.spark.unsafe.types.VariantVal = { + throw new UnsupportedOperationException + } } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarStruct.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarStruct.scala index 34e7a717d..cfb5debeb 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarStruct.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/columnar/AuronColumnarStruct.scala @@ -37,6 +37,8 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.CalendarInterval import org.apache.spark.unsafe.types.UTF8String +import org.apache.auron.sparkver + class AuronColumnarStruct(data: AuronColumnVector, rowId: Int) extends InternalRow { override def numFields: Int = data.dataType.asInstanceOf[StructType].size @@ -143,4 +145,9 @@ class AuronColumnarStruct(data: AuronColumnVector, rowId: Int) extends InternalR override def setNullAt(ordinal: Int): Unit = { throw new UnsupportedOperationException } + + @sparkver("4.0") + override def getVariant(i: Int): org.apache.spark.unsafe.types.VariantVal = { + throw new UnsupportedOperationException + } } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala index b37dae194..c36a25090 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeBroadcastExchangeBase.scala @@ -63,7 +63,7 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.BinaryType -import org.apache.auron.{protobuf => pb} +import org.apache.auron.{protobuf => pb, sparkver} import org.apache.auron.jni.JniBridge import org.apache.auron.metric.SparkMetricNode @@ -137,9 +137,21 @@ abstract class NativeBroadcastExchangeBase(mode: BroadcastMode, override val chi dummyBroadcasted.asInstanceOf[Broadcast[T]] } - def doExecuteBroadcastNative[T](): broadcast.Broadcast[T] = { + @sparkver("3.0 / 3.1 / 3.2 / 3.3 / 3.4 / 3.5") + def getBroadcastTimeout: Long = { val conf = SparkSession.getActiveSession.map(_.sqlContext.conf).orNull - val timeout: Long = conf.broadcastTimeout + conf.broadcastTimeout + } + + @sparkver("4.0") + def getBroadcastTimeout: Long = { + SparkSession.getActiveSession + .map(_.conf.get("spark.sql.broadcastTimeout").toLong) + .getOrElse(300L) + } + + def doExecuteBroadcastNative[T](): broadcast.Broadcast[T] = { + val timeout: Long = getBroadcastTimeout try { relationFuture.get(timeout, TimeUnit.SECONDS).asInstanceOf[broadcast.Broadcast[T]] } catch { @@ -258,7 +270,31 @@ abstract class NativeBroadcastExchangeBase(mode: BroadcastMode, override val chi lazy val relationFuturePromise: Promise[Broadcast[Any]] = Promise[Broadcast[Any]]() @transient - lazy val relationFuture: Future[Broadcast[Any]] = { + lazy val relationFuture: Future[Broadcast[Any]] = getRelationFuture + + @sparkver("4.0") + private def getRelationFuture = { + SQLExecution.withThreadLocalCaptured[Broadcast[Any]]( + this.session.sqlContext.sparkSession, + BroadcastExchangeExec.executionContext) { + try { + sparkContext.setJobGroup( + getRunId.toString, + s"native broadcast exchange (runId $getRunId)", + interruptOnCancel = true) + val broadcasted = sparkContext.broadcast(collectNative().asInstanceOf[Any]) + relationFuturePromise.trySuccess(broadcasted) + broadcasted + } catch { + case e: Throwable => + relationFuturePromise.tryFailure(e) + throw e + } + } + } + + @sparkver("3.0 / 3.1 / 3.2 / 3.3 / 3.4 / 3.5") + private def getRelationFuture = { SQLExecution.withThreadLocalCaptured[Broadcast[Any]]( Shims.get.getSqlContext(this).sparkSession, BroadcastExchangeExec.executionContext) { diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala index d43f7d17d..b9fb7f99b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala @@ -69,9 +69,10 @@ abstract class NativeParquetInsertIntoHiveTableBase( .filterKeys(Set("stage_id", "output_rows", "elapsed_compute")) .toSeq :+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time")) - :+ ("bytes_written", - SQLMetrics - .createSizeMetric(sparkContext, "Native.bytes_written")): _*) + :+ ( + "bytes_written", + SQLMetrics + .createSizeMetric(sparkContext, "Native.bytes_written")): _*) def check(): Unit = { val hadoopConf = sparkContext.hadoopConfiguration