Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ import org.apache.spark.sql.execution.auron.plan.NativeWindowBase
import org.apache.spark.sql.execution.auron.plan.NativeWindowExec
import org.apache.spark.sql.execution.auron.shuffle.{AuronBlockStoreShuffleReaderBase, AuronRssShuffleManagerBase, RssPartitionWriterBase}
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, ShuffledHashJoinExec}
import org.apache.spark.sql.execution.joins.auron.plan.NativeBroadcastJoinExec
import org.apache.spark.sql.execution.joins.auron.plan.NativeShuffledHashJoinExecProvider
Expand Down Expand Up @@ -1074,6 +1074,25 @@ class ShimsImpl extends Shims with Logging {
case _ => false
})
}

@sparkver("3.2 / 3.3 / 3.4 / 3.5")
override def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = exec.isSkewJoin

@sparkver("3.0 / 3.1")
override def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = false

@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
override def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = Some(exec.shuffleOrigin)

@sparkver("3.0")
override def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = None

@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
override def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean =
exec.isNullAwareAntiJoin

@sparkver("3.0")
override def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean = false
}

case class ForceNativeExecutionWrapper(override val child: SparkPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ import org.apache.auron.metric.SparkMetricNode
import org.apache.auron.protobuf.EmptyPartitionsExecNode
import org.apache.auron.protobuf.PhysicalPlanNode
import org.apache.auron.spark.configuration.SparkAuronConfiguration
import org.apache.auron.sparkver

object AuronConverters extends Logging {
def enableScan: Boolean =
Expand Down Expand Up @@ -412,21 +411,9 @@ object AuronConverters extends Logging {
Shims.get.createNativeShuffleExchangeExec(
outputPartitioning,
addRenameColumnsExec(convertedChild),
getShuffleOrigin(exec))
Shims.get.getShuffleOrigin(exec))
}

@sparkver(" 3.2 / 3.3 / 3.4 / 3.5")
def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = exec.isSkewJoin

@sparkver("3.0 / 3.1")
def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = false

@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = Some(exec.shuffleOrigin)

@sparkver("3.0")
def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = None

def convertFileSourceScanExec(exec: FileSourceScanExec): SparkPlan = {
val (
relation,
Expand Down Expand Up @@ -606,8 +593,7 @@ object AuronConverters extends Logging {
rightKeys,
joinType,
buildSide,
getIsSkewJoinFromSHJ(exec))

Shims.get.getIsSkewJoinFromSHJ(exec))
} catch {
case _ if sparkAuronConfig.getBoolean(SparkAuronConfiguration.FORCE_SHUFFLED_HASH_JOIN) =>
logWarning(
Expand Down Expand Up @@ -646,12 +632,6 @@ object AuronConverters extends Logging {
}
}

@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean = exec.isNullAwareAntiJoin

@sparkver("3.0")
def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean = false

def convertBroadcastHashJoinExec(exec: BroadcastHashJoinExec): SparkPlan = {
val buildSide = Shims.get.getJoinBuildSide(exec)
try {
Expand All @@ -663,7 +643,7 @@ object AuronConverters extends Logging {
exec.condition,
exec.left,
exec.right,
isNullAwareAntiJoin(exec))
Shims.get.isNullAwareAntiJoin(exec))
logDebugPlanConversion(
exec,
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ import org.apache.spark.sql.execution.auron.plan.NativeBroadcastJoinBase
import org.apache.spark.sql.execution.auron.plan.NativeSortMergeJoinBase
import org.apache.spark.sql.execution.auron.shuffle.RssPartitionWriterBase
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.spark.sql.types.DataType
Expand Down Expand Up @@ -264,6 +265,12 @@ abstract class Shims {
def getAdaptiveInputPlan(exec: AdaptiveSparkPlanExec): SparkPlan

def getJoinBuildSide(exec: SparkPlan): JoinBuildSide

def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean

def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any]

def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean
}

object Shims {
Expand Down