Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rust/sedona-common/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ config_namespace! {
/// Include tie-breakers in KNN join results when there are tied distances
pub knn_include_tie_breakers: bool, default = false

/// Repartition the probe side before performing spatial join. This can improve performance by
/// balancing the workload, especially for skewed datasets or large sorted datasets where spatial
/// locality might cause imbalanced partitions when running out-of-core spatial join.
pub repartition_probe_side: bool, default = true

/// Maximum number of sample bounding boxes collected from the index side for partitioning the
/// data when running out-of-core spatial join
pub max_index_side_bbox_samples: usize, default = 10000
Expand Down
71 changes: 70 additions & 1 deletion rust/sedona-spatial-join/src/planner/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ use datafusion::execution::context::QueryPlanner;
use datafusion::execution::session_state::{SessionState, SessionStateBuilder};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner};
use datafusion_common::{plan_err, DFSchema, Result};
use datafusion_common::{plan_err, DFSchema, JoinSide, Result};
use datafusion_expr::logical_plan::UserDefinedLogicalNode;
use datafusion_expr::LogicalPlan;
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr::Partitioning;
use datafusion_physical_plan::joins::utils::JoinFilter;
use datafusion_physical_plan::joins::NestedLoopJoinExec;
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::ExecutionPlanProperties;
use sedona_common::sedona_internal_err;

use crate::exec::SpatialJoinExec;
Expand Down Expand Up @@ -152,6 +155,22 @@ impl ExtensionPlanner for SpatialJoinExtensionPlanner {
&& join_type.supports_swap()
&& should_swap_join_order(physical_left.as_ref(), physical_right.as_ref())?;

// Repartition the probe side when enabled. This breaks spatial locality in sorted/skewed
// datasets, leading to more balanced workloads during out-of-core spatial join.
// We determine which pre-swap input will be the probe AFTER any potential swap, and
// repartition it here. swap_inputs() will then carry the RepartitionExec to the correct
// child position.
let (physical_left, physical_right) = if ext.spatial_join.repartition_probe_side {
repartition_probe_side(
physical_left,
physical_right,
&spatial_predicate,
should_swap,
)?
} else {
(physical_left, physical_right)
};

let exec = SpatialJoinExec::try_new(
physical_left,
physical_right,
Expand Down Expand Up @@ -264,3 +283,53 @@ fn logical_join_filter_to_physical(
let join_filter = JoinFilter::new(filter_expr, column_indices, Arc::new(filter_schema));
Ok(join_filter)
}

/// Repartition the probe side of a spatial join using `RoundRobinBatch` partitioning.
///
/// The purpose is to break spatial locality in sorted or skewed datasets, which can cause
/// imbalanced partitions when running out-of-core spatial join. The number of partitions is
/// preserved; only the distribution of rows across partitions is shuffled.
///
/// The `should_swap` parameter indicates whether `swap_inputs()` will be called after
/// `SpatialJoinExec` is constructed. This affects which pre-swap input will become the
/// probe side:
/// - For non-KNN predicates: probe is always `Right` after any swap. If `should_swap` is true,
/// the current `left` will become `right` (probe) after swap, so we repartition `left`.
/// - For KNN predicates: `should_swap` is always false, and the probe side is determined by
/// `KNNPredicate::probe_side`.
fn repartition_probe_side(
mut physical_left: Arc<dyn ExecutionPlan>,
mut physical_right: Arc<dyn ExecutionPlan>,
spatial_predicate: &SpatialPredicate,
should_swap: bool,
) -> Result<(Arc<dyn ExecutionPlan>, Arc<dyn ExecutionPlan>)> {
let probe_plan = match spatial_predicate {
SpatialPredicate::KNearestNeighbors(knn) => match knn.probe_side {
JoinSide::Left => &mut physical_left,
JoinSide::Right => &mut physical_right,
JoinSide::None => {
// KNNPredicate::probe_side is asserted not to be None in its constructor;
// treat this as a debug-only invariant violation and default to right.
debug_assert!(false, "KNNPredicate::probe_side must not be JoinSide::None");
&mut physical_right
}
},
_ => {
// For Relation/Distance predicates, probe is always Right after swap.
// If should_swap, the current left will be moved to the right (probe) by swap_inputs().
if should_swap {
&mut physical_left
} else {
&mut physical_right
}
}
};

let num_partitions = probe_plan.output_partitioning().partition_count();
*probe_plan = Arc::new(RepartitionExec::try_new(
Arc::clone(probe_plan),
Partitioning::RoundRobinBatch(num_partitions),
)?);

Ok((physical_left, physical_right))
}