diff --git a/rust/sedona-common/src/option.rs b/rust/sedona-common/src/option.rs index e5290688f..280a4705e 100644 --- a/rust/sedona-common/src/option.rs +++ b/rust/sedona-common/src/option.rs @@ -71,6 +71,11 @@ config_namespace! { /// Include tie-breakers in KNN join results when there are tied distances pub knn_include_tie_breakers: bool, default = false + /// Repartition the probe side before performing spatial join. This can improve performance by + /// balancing the workload, especially for skewed datasets or large sorted datasets where spatial + /// locality might cause imbalanced partitions when running out-of-core spatial join. + pub repartition_probe_side: bool, default = true + /// Maximum number of sample bounding boxes collected from the index side for partitioning the /// data when running out-of-core spatial join pub max_index_side_bbox_samples: usize, default = 10000 diff --git a/rust/sedona-spatial-join/src/planner/physical_planner.rs b/rust/sedona-spatial-join/src/planner/physical_planner.rs index cb8b3b7f9..d03411542 100644 --- a/rust/sedona-spatial-join/src/planner/physical_planner.rs +++ b/rust/sedona-spatial-join/src/planner/physical_planner.rs @@ -27,12 +27,15 @@ use datafusion::execution::context::QueryPlanner; use datafusion::execution::session_state::{SessionState, SessionStateBuilder}; use datafusion::physical_plan::ExecutionPlan; use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner}; -use datafusion_common::{plan_err, DFSchema, Result}; +use datafusion_common::{plan_err, DFSchema, JoinSide, Result}; use datafusion_expr::logical_plan::UserDefinedLogicalNode; use datafusion_expr::LogicalPlan; use datafusion_physical_expr::create_physical_expr; +use datafusion_physical_expr::Partitioning; use datafusion_physical_plan::joins::utils::JoinFilter; use datafusion_physical_plan::joins::NestedLoopJoinExec; +use datafusion_physical_plan::repartition::RepartitionExec; +use datafusion_physical_plan::ExecutionPlanProperties; use sedona_common::sedona_internal_err; use crate::exec::SpatialJoinExec; @@ -152,6 +155,22 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner { && join_type.supports_swap() && should_swap_join_order(physical_left.as_ref(), physical_right.as_ref())?; + // Repartition the probe side when enabled. This breaks spatial locality in sorted/skewed + // datasets, leading to more balanced workloads during out-of-core spatial join. + // We determine which pre-swap input will be the probe AFTER any potential swap, and + // repartition it here. swap_inputs() will then carry the RepartitionExec to the correct + // child position. + let (physical_left, physical_right) = if ext.spatial_join.repartition_probe_side { + repartition_probe_side( + physical_left, + physical_right, + &spatial_predicate, + should_swap, + )? + } else { + (physical_left, physical_right) + }; + let exec = SpatialJoinExec::try_new( physical_left, physical_right, @@ -264,3 +283,53 @@ fn logical_join_filter_to_physical( let join_filter = JoinFilter::new(filter_expr, column_indices, Arc::new(filter_schema)); Ok(join_filter) } + +/// Repartition the probe side of a spatial join using `RoundRobinBatch` partitioning. +/// +/// The purpose is to break spatial locality in sorted or skewed datasets, which can cause +/// imbalanced partitions when running out-of-core spatial join. The number of partitions is +/// preserved; only the distribution of rows across partitions is shuffled. +/// +/// The `should_swap` parameter indicates whether `swap_inputs()` will be called after +/// `SpatialJoinExec` is constructed. This affects which pre-swap input will become the +/// probe side: +/// - For non-KNN predicates: probe is always `Right` after any swap. If `should_swap` is true, +/// the current `left` will become `right` (probe) after swap, so we repartition `left`. +/// - For KNN predicates: `should_swap` is always false, and the probe side is determined by +/// `KNNPredicate::probe_side`. +fn repartition_probe_side( + mut physical_left: Arc, + mut physical_right: Arc, + spatial_predicate: &SpatialPredicate, + should_swap: bool, +) -> Result<(Arc, Arc)> { + let probe_plan = match spatial_predicate { + SpatialPredicate::KNearestNeighbors(knn) => match knn.probe_side { + JoinSide::Left => &mut physical_left, + JoinSide::Right => &mut physical_right, + JoinSide::None => { + // KNNPredicate::probe_side is asserted not to be None in its constructor; + // treat this as a debug-only invariant violation and default to right. + debug_assert!(false, "KNNPredicate::probe_side must not be JoinSide::None"); + &mut physical_right + } + }, + _ => { + // For Relation/Distance predicates, probe is always Right after swap. + // If should_swap, the current left will be moved to the right (probe) by swap_inputs(). + if should_swap { + &mut physical_left + } else { + &mut physical_right + } + } + }; + + let num_partitions = probe_plan.output_partitioning().partition_count(); + *probe_plan = Arc::new(RepartitionExec::try_new( + Arc::clone(probe_plan), + Partitioning::RoundRobinBatch(num_partitions), + )?); + + Ok((physical_left, physical_right)) +}