diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala index cb9492c9c..b8e029142 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/auron/ShimsImpl.scala @@ -96,7 +96,7 @@ import org.apache.spark.sql.execution.auron.plan.NativeWindowBase import org.apache.spark.sql.execution.auron.plan.NativeWindowExec import org.apache.spark.sql.execution.auron.shuffle.{AuronBlockStoreShuffleReaderBase, AuronRssShuffleManagerBase, RssPartitionWriterBase} import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec} import org.apache.spark.sql.execution.joins.auron.plan.NativeBroadcastJoinExec import org.apache.spark.sql.execution.joins.auron.plan.NativeShuffledHashJoinExecProvider @@ -1074,6 +1074,25 @@ class ShimsImpl extends Shims with Logging { case _ => false }) } + + @sparkver("3.2 / 3.3 / 3.4 / 3.5") + override def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = exec.isSkewJoin + + @sparkver("3.0 / 3.1") + override def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = false + + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + override def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = Some(exec.shuffleOrigin) + + @sparkver("3.0") + override def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = None + + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") + override def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean = + exec.isNullAwareAntiJoin + + @sparkver("3.0") + override def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean = false } case class ForceNativeExecutionWrapper(override val child: SparkPlan) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index 4f124bd8f..90bc60491 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -78,7 +78,6 @@ import org.apache.auron.metric.SparkMetricNode import org.apache.auron.protobuf.EmptyPartitionsExecNode import org.apache.auron.protobuf.PhysicalPlanNode import org.apache.auron.spark.configuration.SparkAuronConfiguration -import org.apache.auron.sparkver object AuronConverters extends Logging { def enableScan: Boolean = @@ -412,21 +411,9 @@ object AuronConverters extends Logging { Shims.get.createNativeShuffleExchangeExec( outputPartitioning, addRenameColumnsExec(convertedChild), - getShuffleOrigin(exec)) + Shims.get.getShuffleOrigin(exec)) } - @sparkver(" 3.2 / 3.3 / 3.4 / 3.5") - def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = exec.isSkewJoin - - @sparkver("3.0 / 3.1") - def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = false - - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") - def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = Some(exec.shuffleOrigin) - - @sparkver("3.0") - def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = None - def convertFileSourceScanExec(exec: FileSourceScanExec): SparkPlan = { val ( relation, @@ -606,8 +593,7 @@ object AuronConverters extends Logging { rightKeys, joinType, buildSide, - getIsSkewJoinFromSHJ(exec)) - + Shims.get.getIsSkewJoinFromSHJ(exec)) } catch { case _ if sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN) => logWarning( @@ -646,12 +632,6 @@ object AuronConverters extends Logging { } } - @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5") - def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean = exec.isNullAwareAntiJoin - - @sparkver("3.0") - def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean = false - def convertBroadcastHashJoinExec(exec: BroadcastHashJoinExec): SparkPlan = { val buildSide = Shims.get.getJoinBuildSide(exec) try { @@ -663,7 +643,7 @@ object AuronConverters extends Logging { exec.condition, exec.left, exec.right, - isNullAwareAntiJoin(exec)) + Shims.get.isNullAwareAntiJoin(exec)) logDebugPlanConversion( exec, Seq( diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala index d2489726b..2b221d36d 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/Shims.scala @@ -48,7 +48,8 @@ import org.apache.spark.sql.execution.auron.plan.NativeBroadcastJoinBase import org.apache.spark.sql.execution.auron.plan.NativeSortMergeJoinBase import org.apache.spark.sql.execution.auron.shuffle.RssPartitionWriterBase import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec} import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.hive.execution.InsertIntoHiveTable import org.apache.spark.sql.types.DataType @@ -264,6 +265,12 @@ abstract class Shims { def getAdaptiveInputPlan(exec: AdaptiveSparkPlanExec): SparkPlan def getJoinBuildSide(exec: SparkPlan): JoinBuildSide + + def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean + + def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] + + def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean } object Shims {