Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
405 changes: 373 additions & 32 deletions rust/sedona-spatial-join/src/exec.rs

Large diffs are not rendered by default.

36 changes: 34 additions & 2 deletions rust/sedona-spatial-join/src/index/spatial_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ use crate::{
use arrow::array::BooleanBufferBuilder;
use sedona_common::{option::SpatialJoinOptions, sedona_internal_err, ExecutionMode};

pub const DISTANCE_TOLERANCE: f64 = 1e-9;

pub struct SpatialIndex {
pub(crate) schema: SchemaRef,
pub(crate) options: SpatialJoinOptions,
Expand Down Expand Up @@ -213,13 +215,33 @@ impl SpatialIndex {
/// # Returns
///
/// * `JoinResultMetrics` containing the number of actual matches and candidates processed
#[allow(unused)]
pub(crate) fn query_knn(
&self,
probe_wkb: &Wkb,
k: u32,
use_spheroid: bool,
include_tie_breakers: bool,
build_batch_positions: &mut Vec<(i32, i32)>,
) -> Result<QueryResultMetrics> {
self.query_knn_with_distance(
probe_wkb,
k,
use_spheroid,
include_tie_breakers,
build_batch_positions,
None,
)
}

pub(crate) fn query_knn_with_distance(
&self,
probe_wkb: &Wkb,
k: u32,
use_spheroid: bool,
include_tie_breakers: bool,
build_batch_positions: &mut Vec<(i32, i32)>,
mut distances: Option<&mut Vec<f64>>,
) -> Result<QueryResultMetrics> {
if k == 0 {
return Ok(QueryResultMetrics {
Expand Down Expand Up @@ -336,7 +358,7 @@ impl SpatialIndex {
max_y + distance_f32,
);

// Use rtree.search() with envelope bounds (like the old code)
// Use rtree.search() with envelope bounds
let expanded_results = self.rtree.search(min_x, min_y, max_x, max_y);

candidate_count = expanded_results.len();
Expand All @@ -362,7 +384,6 @@ impl SpatialIndex {
.sort_by(|a, b| a.0.partial_cmp(&b.0).unwrap_or(std::cmp::Ordering::Equal));

// Include all results up to and including those with the same distance as the k-th result
const DISTANCE_TOLERANCE: f64 = 1e-9;
let mut tie_breaker_results: Vec<u32> = Vec::new();

for (i, &(distance, result_idx)) in all_distances_with_indices.iter().enumerate() {
Expand Down Expand Up @@ -391,6 +412,17 @@ impl SpatialIndex {
for &result_idx in &final_results {
if (result_idx as usize) < self.data_id_to_batch_pos.len() {
build_batch_positions.push(self.data_id_to_batch_pos[result_idx as usize]);

if let Some(dists) = distances.as_mut() {
let mut dist = f64::NAN;
if let Some(item_geom) = geometry_accessor.get_geometry(result_idx as usize) {
dist = distance_metric
.distance_to_geometry(&probe_geom, item_geom)
.to_f64()
.unwrap_or(f64::NAN);
}
dists.push(dist);
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions rust/sedona-spatial-join/src/partitioning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
use datafusion_common::Result;
use sedona_geometry::bounding_box::BoundingBox;

pub mod broadcast;
pub mod flat;
pub mod kdb;
pub(crate) mod partition_slots;
pub mod round_robin;
pub mod rtree;
pub mod stream_repartitioner;
pub(crate) mod util;
Expand Down
72 changes: 72 additions & 0 deletions rust/sedona-spatial-join/src/partitioning/broadcast.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_common::Result;
use sedona_common::sedona_internal_err;
use sedona_geometry::bounding_box::BoundingBox;

use crate::partitioning::{SpatialPartition, SpatialPartitioner};

/// A partitioner that assigns everything to the Multi partition.
///
/// This partitioner is useful when we want to broadcast the data to all partitions.
pub struct BroadcastPartitioner {
num_partitions: usize,
}

impl BroadcastPartitioner {
pub fn new(num_partitions: usize) -> Self {
Self { num_partitions }
}
}

impl SpatialPartitioner for BroadcastPartitioner {
fn num_regular_partitions(&self) -> usize {
self.num_partitions
}

fn partition(&self, _bbox: &BoundingBox) -> Result<SpatialPartition> {
Ok(SpatialPartition::Multi)
}

fn partition_no_multi(&self, _bbox: &BoundingBox) -> Result<SpatialPartition> {
sedona_internal_err!("BroadcastPartitioner does not support partition_no_multi")
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_broadcast_partitioner() {
let num_partitions = 4;
let partitioner = BroadcastPartitioner::new(num_partitions);
assert_eq!(partitioner.num_regular_partitions(), num_partitions);

let bbox = BoundingBox::xy((0.0, 10.0), (0.0, 10.0));

// Test partition
assert_eq!(
partitioner.partition(&bbox).unwrap(),
SpatialPartition::Multi
);

// Test partition_no_multi
assert!(partitioner.partition_no_multi(&bbox).is_err());
}
}
80 changes: 80 additions & 0 deletions rust/sedona-spatial-join/src/partitioning/round_robin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::atomic::{AtomicUsize, Ordering};

use datafusion_common::Result;
use sedona_geometry::bounding_box::BoundingBox;

use crate::partitioning::{SpatialPartition, SpatialPartitioner};

/// A partitioner that assigns partitions in a round-robin fashion.
///
/// This partitioner is used for KNN join, where the build side is partitioned
/// into `num_partitions` partitions, and the probe side is assigned to the
/// `Multi` partition (i.e., broadcast to all partitions).
pub struct RoundRobinPartitioner {
num_partitions: usize,
counter: AtomicUsize,
}

impl RoundRobinPartitioner {
pub fn new(num_partitions: usize) -> Self {
Self {
num_partitions,
counter: AtomicUsize::new(0),
}
}
}

impl SpatialPartitioner for RoundRobinPartitioner {
fn num_regular_partitions(&self) -> usize {
self.num_partitions
}

fn partition(&self, bbox: &BoundingBox) -> Result<SpatialPartition> {
self.partition_no_multi(bbox)
}

fn partition_no_multi(&self, _bbox: &BoundingBox) -> Result<SpatialPartition> {
let idx = self.counter.fetch_add(1, Ordering::Relaxed);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This round robin partitioner is nondeterministic due to the order of concurrent tasks being scheduled. We will address this by making all partitioners non-sync, so that each async task will have its own partitioner with its own mutable internal state. This will be a relatively large refactoring so I'll leave it to the next PR.

Ok(SpatialPartition::Regular(
(idx % self.num_partitions) as u32,
))
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_round_robin_partitioner() {
let num_partitions = 4;
let partitioner = RoundRobinPartitioner::new(num_partitions);
assert_eq!(partitioner.num_regular_partitions(), num_partitions);

let bbox = BoundingBox::xy((0.0, 10.0), (0.0, 10.0));

for i in 0..10 {
assert_eq!(
partitioner.partition_no_multi(&bbox).unwrap(),
SpatialPartition::Regular((i % num_partitions) as u32)
);
}
}
}
23 changes: 16 additions & 7 deletions rust/sedona-spatial-join/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ use crate::{
SpatialJoinBuildMetrics,
},
partitioning::{
broadcast::BroadcastPartitioner,
flat::FlatPartitioner,
kdb::KDBPartitioner,
round_robin::RoundRobinPartitioner,
stream_repartitioner::{SpilledPartition, SpilledPartitions, StreamRepartitioner},
PartitionedSide, SpatialPartition, SpatialPartitioner,
},
Expand Down Expand Up @@ -243,14 +245,16 @@ impl SpatialJoinComponentsBuilder {
build_partitions: &mut Vec<BuildPartition>,
seed: u64,
) -> Result<Arc<dyn SpatialPartitioner>> {
if matches!(
let build_partitioner: Arc<dyn SpatialPartitioner> = if matches!(
self.spatial_predicate,
SpatialPredicate::KNearestNeighbors(..)
SpatialPredicate::KNearestNeighbors(_)
) {
return sedona_internal_err!("Partitioned KNN join is not supported yet");
}

let build_partitioner: Arc<dyn SpatialPartitioner> = {
// Spatial partitioning does not work well for KNN joins, so we simply use round-robin
// partitioning to spread the indexed data evenly to make each index fit in memory, and
// the probe side will be broadcasted to all partitions by partitioning all of them to
// the Multi partition.
Arc::new(RoundRobinPartitioner::new(num_partitions))
} else {
// Use spatial partitioners to partition the build side and the probe side, this will
// reduce the amount of work needed for probing each partitioned index.
// The KDB partitioner is built using the collected bounding box samples.
Expand Down Expand Up @@ -299,7 +303,12 @@ impl SpatialJoinComponentsBuilder {
num_partitions: usize,
merged_spilled_partitions: &SpilledPartitions,
) -> Result<Arc<dyn SpatialPartitioner>> {
let probe_partitioner: Arc<dyn SpatialPartitioner> = {
let probe_partitioner: Arc<dyn SpatialPartitioner> = if matches!(
self.spatial_predicate,
SpatialPredicate::KNearestNeighbors(_)
) {
Arc::new(BroadcastPartitioner::new(num_partitions))
} else {
// Build a flat partitioner using these partitions
let mut partition_bounds = Vec::with_capacity(num_partitions);
for k in 0..num_partitions {
Expand Down
1 change: 1 addition & 0 deletions rust/sedona-spatial-join/src/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ impl ProbeStreamMetrics {
}

pub(crate) mod first_pass_stream;
pub(crate) mod knn_results_merger;
pub(crate) mod non_partitioned_stream;
pub(crate) mod partitioned_stream_provider;
Loading