Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,9 @@ fn bench_stream_partitioner(c: &mut Criterion) {
let seed_counter = Arc::clone(&seed_counter);
let schema = Arc::clone(&schema);
let runtime_env = Arc::clone(&runtime_env);
let partitioner = Arc::clone(&partitioner);
let spill_metrics = spill_metrics.clone();
let extent = Arc::clone(&extent);

let partitioner = partitioner.box_clone();
b.iter_batched(
move || {
let seed = seed_counter.fetch_add(1, Ordering::Relaxed);
Expand All @@ -81,7 +80,7 @@ fn bench_stream_partitioner(c: &mut Criterion) {
block_on(async {
StreamRepartitioner::builder(
runtime_env.clone(),
partitioner.clone(),
partitioner.box_clone(),
PartitionedSide::BuildSide,
spill_metrics.clone(),
)
Expand Down Expand Up @@ -187,7 +186,7 @@ fn build_schema() -> Schema {
])
}

fn build_partitioner(extent: &BoundingBox) -> Arc<dyn SpatialPartitioner + Send + Sync> {
fn build_partitioner(extent: &BoundingBox) -> Box<dyn SpatialPartitioner> {
let mut rng = StdRng::seed_from_u64(RNG_SEED ^ 0x00FF_FFFF);
let samples = (0..SAMPLE_FOR_PARTITIONER)
.map(|_| random_bbox(extent, &mut rng))
Expand All @@ -201,7 +200,7 @@ fn build_partitioner(extent: &BoundingBox) -> Arc<dyn SpatialPartitioner + Send
)
.expect("kdb builder should succeed");

Arc::new(partitioner)
Box::new(partitioner)
}

fn random_bbox(extent: &BoundingBox, rng: &mut impl RngExt) -> BoundingBox {
Expand Down
5 changes: 4 additions & 1 deletion rust/sedona-spatial-join/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ pub enum SpatialPartition {
}

/// Partitioning larger-than-memory indexed side to support out-of-core spatial join.
pub trait SpatialPartitioner: Send + Sync {
pub trait SpatialPartitioner: Send {
/// Get the total number of spatial partitions, excluding the None partition and Multi partition.
fn num_regular_partitions(&self) -> usize;

Expand All @@ -68,6 +68,9 @@ pub trait SpatialPartitioner: Send + Sync {
/// Multi partition. If `bbox` intersects with multiple partitions, only one of them will be
/// selected as regular partition.
fn partition_no_multi(&self, bbox: &BoundingBox) -> Result<SpatialPartition>;

/// Clone the partitioner as a boxed trait object.
fn box_clone(&self) -> Box<dyn SpatialPartitioner>;
}

/// Indicates for which side of the spatial join the partitioning is being performed.
Expand Down
5 changes: 5 additions & 0 deletions rust/sedona-spatial-join/src/partitioning/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::partitioning::{SpatialPartition, SpatialPartitioner};
/// This partitioner is useful when we want to broadcast the data to all partitions.
/// Currently it is used for KNN join where regular spatial partitioning is hard because
/// it is hard to know in advance how far away a given number of neighbours will be to assign it.
#[derive(Clone)]
pub struct BroadcastPartitioner {
num_partitions: usize,
}
Expand All @@ -48,6 +49,10 @@ impl SpatialPartitioner for BroadcastPartitioner {
fn partition_no_multi(&self, _bbox: &BoundingBox) -> Result<SpatialPartition> {
sedona_internal_err!("BroadcastPartitioner does not support partition_no_multi")
}

fn box_clone(&self) -> Box<dyn SpatialPartitioner> {
Box::new(self.clone())
}
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions rust/sedona-spatial-join/src/partitioning/flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use sedona_geometry::interval::IntervalTrait;
use crate::partitioning::{SpatialPartition, SpatialPartitioner};

/// Spatial partitioner that linearly scans partition boundaries.
#[derive(Clone)]
pub struct FlatPartitioner {
boundaries: Vec<BoundingBox>,
}
Expand Down Expand Up @@ -106,6 +107,10 @@ impl SpatialPartitioner for FlatPartitioner {
None => SpatialPartition::None,
})
}

fn box_clone(&self) -> Box<dyn SpatialPartitioner> {
Box::new(self.clone())
}
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions rust/sedona-spatial-join/src/partitioning/kdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,7 @@ impl KDBTree {
/// let query_bbox = BoundingBox::xy((5.0, 15.0), (5.0, 15.0));
/// let partition = partitioner.partition(&query_bbox).unwrap();
/// ```
#[derive(Clone)]
pub struct KDBPartitioner {
tree: Arc<KDBTree>,
}
Expand Down Expand Up @@ -566,6 +567,10 @@ impl SpatialPartitioner for KDBPartitioner {
None => Ok(SpatialPartition::None),
}
}

fn box_clone(&self) -> Box<dyn SpatialPartitioner> {
Box::new(self.clone())
}
}

#[cfg(test)]
Expand Down
14 changes: 10 additions & 4 deletions rust/sedona-spatial-join/src/partitioning/round_robin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::atomic::{AtomicUsize, Ordering};
use std::cell::Cell;

use datafusion_common::Result;
use sedona_geometry::bounding_box::BoundingBox;
Expand All @@ -27,16 +27,17 @@ use crate::partitioning::{SpatialPartition, SpatialPartitioner};
/// This partitioner is used for KNN join, where the build side is partitioned
/// into `num_partitions` partitions, and the probe side is assigned to the
/// `Multi` partition (i.e., broadcast to all partitions).
#[derive(Clone)]
pub struct RoundRobinPartitioner {
num_partitions: usize,
counter: AtomicUsize,
counter: Cell<usize>,
}

impl RoundRobinPartitioner {
pub fn new(num_partitions: usize) -> Self {
Self {
num_partitions,
counter: AtomicUsize::new(0),
counter: Cell::new(0),
}
}
}
Expand All @@ -51,11 +52,16 @@ impl SpatialPartitioner for RoundRobinPartitioner {
}

fn partition_no_multi(&self, _bbox: &BoundingBox) -> Result<SpatialPartition> {
let idx = self.counter.fetch_add(1, Ordering::Relaxed);
let idx = self.counter.get();
self.counter.set(idx.wrapping_add(1));
Ok(SpatialPartition::Regular(
(idx % self.num_partitions) as u32,
))
}

fn box_clone(&self) -> Box<dyn SpatialPartitioner> {
Box::new(self.clone())
}
}

#[cfg(test)]
Expand Down
75 changes: 57 additions & 18 deletions rust/sedona-spatial-join/src/partitioning/rtree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
//! 4. **None-partition Handling**: If a bbox doesn't intersect any partition boundary, it's assigned
//! to [`SpatialPartition::None`].

use std::sync::Arc;

use datafusion_common::Result;
use geo::Rect;
use geo_index::rtree::{sort::HilbertSort, RTree, RTreeBuilder, RTreeIndex};
Expand All @@ -50,15 +52,9 @@ use crate::partitioning::{SpatialPartition, SpatialPartitioner};
/// This partitioner constructs an RTree index over a set of partition boundaries
/// (rectangles) and uses it to efficiently determine which partition a given
/// bounding box belongs to based on spatial intersection.
#[derive(Clone)]
pub struct RTreePartitioner {
/// The RTree index storing partition boundaries as f32 rectangles
rtree: RTree<f32>,
/// Flat representation of partition boundaries for overlap calculations
boundaries: Vec<Rect<f32>>,
/// Number of partitions (excluding None and Multi)
num_partitions: usize,
/// Map from RTree index to original partition index
partition_map: Vec<usize>,
inner: Arc<RawRTreePartitioner>,
}

impl RTreePartitioner {
Expand All @@ -84,12 +80,58 @@ impl RTreePartitioner {
/// let partitioner = RTreePartitioner::try_new(boundaries).unwrap();
/// ```
pub fn try_new(boundaries: Vec<BoundingBox>) -> Result<Self> {
Self::build(boundaries, None)
let inner = RawRTreePartitioner::try_new(boundaries)?;
Ok(Self {
inner: Arc::new(inner),
})
}

/// Create a new RTree partitioner with a custom node size.
pub fn try_new_with_node_size(boundaries: Vec<BoundingBox>, node_size: u16) -> Result<Self> {
Self::build(boundaries, Some(node_size))
let inner = RawRTreePartitioner::build(boundaries, Some(node_size))?;
Ok(Self {
inner: Arc::new(inner),
})
}

/// Return the number of levels in the underlying RTree.
pub fn depth(&self) -> usize {
self.inner.depth()
}
}

impl SpatialPartitioner for RTreePartitioner {
fn num_regular_partitions(&self) -> usize {
self.inner.num_regular_partitions()
}

fn partition(&self, bbox: &BoundingBox) -> Result<SpatialPartition> {
self.inner.partition(bbox)
}

fn partition_no_multi(&self, bbox: &BoundingBox) -> Result<SpatialPartition> {
self.inner.partition_no_multi(bbox)
}

fn box_clone(&self) -> Box<dyn SpatialPartitioner> {
Box::new(self.clone())
}
}

struct RawRTreePartitioner {
/// The RTree index storing partition boundaries as f32 rectangles
rtree: RTree<f32>,
/// Flat representation of partition boundaries for overlap calculations
boundaries: Vec<Rect<f32>>,
/// Number of partitions (excluding None and Multi)
num_partitions: usize,
/// Map from RTree index to original partition index
partition_map: Vec<usize>,
}

impl RawRTreePartitioner {
fn try_new(boundaries: Vec<BoundingBox>) -> Result<Self> {
Self::build(boundaries, None)
}

fn build(boundaries: Vec<BoundingBox>, node_size: Option<u16>) -> Result<Self> {
Expand Down Expand Up @@ -122,25 +164,22 @@ impl RTreePartitioner {

let rtree = rtree_builder.finish::<HilbertSort>();

Ok(RTreePartitioner {
Ok(RawRTreePartitioner {
rtree,
boundaries: rects,
num_partitions,
partition_map,
})
}

/// Return the number of levels in the underlying RTree.
pub fn depth(&self) -> usize {
self.rtree.num_levels()
}
}

impl SpatialPartitioner for RTreePartitioner {
fn num_regular_partitions(&self) -> usize {
self.num_partitions
}

fn depth(&self) -> usize {
self.rtree.num_levels()
}

fn partition(&self, bbox: &BoundingBox) -> Result<SpatialPartition> {
// Convert bbox to f32 for RTree query with proper bounds handling
let (min_x, min_y, max_x, max_y) = match bbox_to_f32_rect(bbox)? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ impl SpilledPartitions {
/// `target_batch_size` rows per partition batch.
pub struct StreamRepartitioner {
runtime_env: Arc<RuntimeEnv>,
partitioner: Arc<dyn SpatialPartitioner>,
partitioner: Box<dyn SpatialPartitioner>,
partitioned_side: PartitionedSide,
slots: PartitionSlots,
/// Spill files for each spatial partition.
Expand Down Expand Up @@ -330,7 +330,7 @@ pub struct StreamRepartitioner {
/// - `spilled_batch_in_memory_size_threshold`: `None`
pub struct StreamRepartitionerBuilder {
runtime_env: Arc<RuntimeEnv>,
partitioner: Arc<dyn SpatialPartitioner>,
partitioner: Box<dyn SpatialPartitioner>,
partitioned_side: PartitionedSide,
spill_compression: SpillCompression,
spill_metrics: SpillMetrics,
Expand Down Expand Up @@ -407,7 +407,7 @@ impl StreamRepartitioner {
/// spill metrics). Optional parameters can then be set on the returned builder.
pub fn builder(
runtime_env: Arc<RuntimeEnv>,
partitioner: Arc<dyn SpatialPartitioner>,
partitioner: Box<dyn SpatialPartitioner>,
partitioned_side: PartitionedSide,
spill_metrics: SpillMetrics,
) -> StreamRepartitionerBuilder {
Expand Down Expand Up @@ -840,7 +840,7 @@ mod tests {
BoundingBox::xy((0.0, 50.0), (0.0, 50.0)),
BoundingBox::xy((50.0, 100.0), (0.0, 50.0)),
];
let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
let partitioner = Box::new(FlatPartitioner::try_new(partitions)?);
let runtime_env = Arc::new(RuntimeEnv::default());
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);

Expand Down Expand Up @@ -926,7 +926,7 @@ mod tests {
BoundingBox::xy((0.0, 50.0), (0.0, 50.0)),
BoundingBox::xy((50.0, 100.0), (0.0, 50.0)),
];
let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
let partitioner = Box::new(FlatPartitioner::try_new(partitions)?);
let runtime_env = Arc::new(RuntimeEnv::default());
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);

Expand Down Expand Up @@ -990,7 +990,7 @@ mod tests {
BoundingBox::xy((0.0, 50.0), (0.0, 50.0)),
BoundingBox::xy((50.0, 100.0), (0.0, 50.0)),
];
let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
let partitioner = Box::new(FlatPartitioner::try_new(partitions)?);
let runtime_env = Arc::new(RuntimeEnv::default());
let spill_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let mut repartitioner = StreamRepartitioner::builder(
Expand Down Expand Up @@ -1035,7 +1035,7 @@ mod tests {
let batch_a = sample_batch(&[0], vec![Some(wkb_point((10.0, 10.0)).unwrap())])?;
let batch_b = sample_batch(&[1], vec![Some(wkb_point((20.0, 10.0)).unwrap())])?;
let partitions = vec![BoundingBox::xy((0.0, 50.0), (0.0, 50.0))];
let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
let partitioner = Box::new(FlatPartitioner::try_new(partitions)?);
let runtime_env = Arc::new(RuntimeEnv::default());
let spill_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let mut repartitioner = StreamRepartitioner::builder(
Expand Down Expand Up @@ -1069,7 +1069,7 @@ mod tests {
let batch_a = sample_batch(&[0], vec![Some(wkb_point((10.0, 10.0)).unwrap())])?;
let batch_b = sample_batch(&[1], vec![Some(wkb_point((20.0, 10.0)).unwrap())])?;
let partitions = vec![BoundingBox::xy((0.0, 50.0), (0.0, 50.0))];
let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
let partitioner = Box::new(FlatPartitioner::try_new(partitions)?);
let runtime_env = Arc::new(RuntimeEnv::default());
let spill_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let mut repartitioner = StreamRepartitioner::builder(
Expand Down
Loading