diff --git a/rust/sedona-spatial-join/Cargo.toml b/rust/sedona-spatial-join/Cargo.toml index 902f27f4a..74a250650 100644 --- a/rust/sedona-spatial-join/Cargo.toml +++ b/rust/sedona-spatial-join/Cargo.toml @@ -107,3 +107,8 @@ harness = false name = "stream_repartitioner" path = "bench/partitioning/stream_repartitioner.rs" harness = false + +[[bench]] +name = "flat_vs_rtree" +path = "bench/partitioning/flat_vs_rtree.rs" +harness = false diff --git a/rust/sedona-spatial-join/bench/partitioning/flat.rs b/rust/sedona-spatial-join/bench/partitioning/flat.rs index efe292301..9fa647cde 100644 --- a/rust/sedona-spatial-join/bench/partitioning/flat.rs +++ b/rust/sedona-spatial-join/bench/partitioning/flat.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -mod common; - use std::hint::black_box; -use common::{default_extent, grid_partitions, sample_queries, GRID_DIM, QUERY_BATCH_SIZE}; use criterion::{criterion_group, criterion_main, Criterion, Throughput}; use sedona_spatial_join::partitioning::{flat::FlatPartitioner, SpatialPartitioner}; +use sedona_spatial_join::utils::internal_benchmark_util::{ + default_extent, grid_partitions, sample_queries, GRID_DIM, QUERY_BATCH_SIZE, +}; fn bench_flat_partition_queries(c: &mut Criterion) { let extent = default_extent(); diff --git a/rust/sedona-spatial-join/bench/partitioning/flat_vs_rtree.rs b/rust/sedona-spatial-join/bench/partitioning/flat_vs_rtree.rs new file mode 100644 index 000000000..4c0bceae7 --- /dev/null +++ b/rust/sedona-spatial-join/bench/partitioning/flat_vs_rtree.rs @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Head-to-head benchmark of FlatPartitioner vs RTreePartitioner across +//! varying partition counts to find the optimal switch point. + +use std::hint::black_box; + +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use sedona_spatial_join::partitioning::{ + flat::FlatPartitioner, rtree::RTreePartitioner, SpatialPartitioner, +}; +use sedona_spatial_join::utils::internal_benchmark_util::{ + default_extent, grid_partitions, sample_queries, QUERY_BATCH_SIZE, +}; + +/// Grid dimensions to benchmark. Each produces dim*dim partitions. +/// 4x4=16, 5x5=25, 6x6=36, 8x8=64, 10x10=100, 16x16=256, 20x20=400 +const GRID_DIMS: [usize; 7] = [4, 5, 6, 8, 10, 16, 20]; + +fn bench_flat_vs_rtree(c: &mut Criterion) { + let extent = default_extent(); + + let mut group = c.benchmark_group("flat_vs_rtree"); + group.throughput(Throughput::Elements(QUERY_BATCH_SIZE as u64)); + + for &dim in &GRID_DIMS { + let num_partitions = dim * dim; + let partitions = grid_partitions(&extent, dim); + let queries = sample_queries(&extent, QUERY_BATCH_SIZE); + + let flat = + FlatPartitioner::try_new(partitions.clone()).expect("failed to build FlatPartitioner"); + let rtree = + RTreePartitioner::try_new(partitions).expect("failed to build RTreePartitioner"); + + group.bench_with_input( + BenchmarkId::new("flat", num_partitions), + &flat, + |b, partitioner| { + b.iter(|| { + for query in &queries { + let result = partitioner + .partition(black_box(query)) + .expect("partition failed"); + black_box(result); + } + }); + }, + ); + + group.bench_with_input( + BenchmarkId::new("rtree", num_partitions), + &rtree, + |b, partitioner| { + b.iter(|| { + for query in &queries { + let result = partitioner + .partition(black_box(query)) + .expect("partition failed"); + black_box(result); + } + }); + }, + ); + } + + group.finish(); +} + +criterion_group!(flat_vs_rtree, bench_flat_vs_rtree); +criterion_main!(flat_vs_rtree); diff --git a/rust/sedona-spatial-join/bench/partitioning/rtree.rs b/rust/sedona-spatial-join/bench/partitioning/rtree.rs index ad39c41e8..5f5c461ee 100644 --- a/rust/sedona-spatial-join/bench/partitioning/rtree.rs +++ b/rust/sedona-spatial-join/bench/partitioning/rtree.rs @@ -15,14 +15,14 @@ // specific language governing permissions and limitations // under the License. -mod common; - use std::hint::black_box; -use common::{default_extent, grid_partitions, sample_queries, GRID_DIM, QUERY_BATCH_SIZE}; use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; use sedona_geometry::bounding_box::BoundingBox; use sedona_spatial_join::partitioning::{rtree::RTreePartitioner, SpatialPartitioner}; +use sedona_spatial_join::utils::internal_benchmark_util::{ + default_extent, grid_partitions, sample_queries, GRID_DIM, QUERY_BATCH_SIZE, +}; const NODE_SIZES: [u16; 5] = [4, 8, 16, 32, 64]; // smaller node size => deeper tree fn bench_rtree_partition_queries(c: &mut Criterion) { diff --git a/rust/sedona-spatial-join/src/build_index.rs b/rust/sedona-spatial-join/src/build_index.rs deleted file mode 100644 index 0e2923695..000000000 --- a/rust/sedona-spatial-join/src/build_index.rs +++ /dev/null @@ -1,115 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::sync::Arc; - -use arrow_schema::SchemaRef; -use datafusion_common::{DataFusionError, Result}; -use datafusion_execution::{memory_pool::MemoryConsumer, SendableRecordBatchStream, TaskContext}; -use datafusion_expr::JoinType; -use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; -use sedona_common::SedonaOptions; - -use crate::{ - index::{ - BuildSideBatchesCollector, CollectBuildSideMetrics, SpatialIndex, SpatialIndexBuilder, - SpatialJoinBuildMetrics, - }, - spatial_predicate::SpatialPredicate, -}; - -/// Build a spatial index from the build side streams. -/// -/// This function reads the `concurrent_build_side_collection` configuration from the context -/// to determine whether to collect build side partitions concurrently (using spawned tasks) -/// or sequentially (for JNI/embedded contexts without async runtime support). -#[allow(clippy::too_many_arguments)] -pub async fn build_index( - context: Arc, - build_schema: SchemaRef, - build_streams: Vec, - spatial_predicate: SpatialPredicate, - join_type: JoinType, - probe_threads_count: usize, - metrics: ExecutionPlanMetricsSet, - seed: u64, -) -> Result { - log::debug!( - "Building spatial index for running spatial join, seed = {}", - seed - ); - let session_config = context.session_config(); - let sedona_options = session_config - .options() - .extensions - .get::() - .cloned() - .unwrap_or_default(); - let concurrent = sedona_options.spatial_join.concurrent_build_side_collection; - let runtime_env = context.runtime_env(); - let spill_compression = session_config.spill_compression(); - let memory_pool = context.memory_pool(); - let collector = BuildSideBatchesCollector::new( - spatial_predicate.clone(), - sedona_options.spatial_join.clone(), - Arc::clone(&runtime_env), - spill_compression, - ); - let num_partitions = build_streams.len(); - let mut collect_metrics_vec = Vec::with_capacity(num_partitions); - let mut reservations = Vec::with_capacity(num_partitions); - for k in 0..num_partitions { - let consumer = - MemoryConsumer::new(format!("SpatialJoinCollectBuildSide[{k}]")).with_can_spill(true); - let reservation = consumer.register(memory_pool); - reservations.push(reservation); - collect_metrics_vec.push(CollectBuildSideMetrics::new(k, &metrics)); - } - - let build_partitions = collector - .collect_all( - build_streams, - reservations, - collect_metrics_vec, - concurrent, - seed, - ) - .await?; - - let contains_external_stream = build_partitions.iter().any(|partition| { - // Access fields to avoid unused variable warnings. Will be removed when out-of-core - // spatial join (https://github.com/apache/sedona-db/issues/436) is fully implemented. - let _ = partition.num_rows; - let _ = partition.bbox_samples; - let _ = partition.estimated_spatial_index_memory_usage; - partition.build_side_batch_stream.is_external() - }); - if !contains_external_stream { - let mut index_builder = SpatialIndexBuilder::new( - build_schema, - spatial_predicate, - sedona_options.spatial_join, - join_type, - probe_threads_count, - SpatialJoinBuildMetrics::new(0, &metrics), - )?; - index_builder.add_partitions(build_partitions).await?; - index_builder.finish() - } else { - Err(DataFusionError::ResourcesExhausted("Memory limit exceeded while collecting indexed data. External spatial index builder is not yet implemented.".to_string())) - } -} diff --git a/rust/sedona-spatial-join/src/prepare.rs b/rust/sedona-spatial-join/src/prepare.rs index 9e9a2053f..60b7da32b 100644 --- a/rust/sedona-spatial-join/src/prepare.rs +++ b/rust/sedona-spatial-join/src/prepare.rs @@ -44,6 +44,7 @@ use crate::{ flat::FlatPartitioner, kdb::KDBPartitioner, round_robin::RoundRobinPartitioner, + rtree::RTreePartitioner, stream_repartitioner::{SpilledPartition, SpilledPartitions, StreamRepartitioner}, PartitionedSide, SpatialPartition, SpatialPartitioner, }, @@ -296,8 +297,14 @@ impl SpatialJoinComponentsBuilder { Ok(build_partitioner) } - /// Construct a `SpatialPartitioner` (e.g. Flat) from the statistics of partitioned build - /// side for partitioning the probe side. + /// The number of partitions above which the probe side uses an RTree + /// partitioner instead of a flat (linear-scan) partitioner. Benchmarks + /// show the crossover at ~36 partitions; 48 gives a comfortable margin. + const RTREE_PARTITION_THRESHOLD: usize = 48; + + /// Construct a `SpatialPartitioner` for partitioning the probe side. + /// Uses a flat linear-scan partitioner when the number of partitions is + /// small, and switches to an RTree-based partitioner for larger counts. fn create_spatial_partitioner_for_probe_side( &self, num_partitions: usize, @@ -309,7 +316,7 @@ impl SpatialJoinComponentsBuilder { ) { Box::new(BroadcastPartitioner::new(num_partitions)) } else { - // Build a flat partitioner using these partitions + // Collect partition bounding boxes from the spilled partitions let mut partition_bounds = Vec::with_capacity(num_partitions); for k in 0..num_partitions { let partition = SpatialPartition::Regular(k as u32); @@ -320,7 +327,12 @@ impl SpatialJoinComponentsBuilder { .unwrap_or(BoundingBox::empty()); partition_bounds.push(partition_bound); } - Box::new(FlatPartitioner::try_new(partition_bounds)?) + + if num_partitions <= Self::RTREE_PARTITION_THRESHOLD { + Box::new(FlatPartitioner::try_new(partition_bounds)?) + } else { + Box::new(RTreePartitioner::try_new(partition_bounds)?) + } }; Ok(probe_partitioner) } diff --git a/rust/sedona-spatial-join/src/utils.rs b/rust/sedona-spatial-join/src/utils.rs index 4d73a0024..ae7848992 100644 --- a/rust/sedona-spatial-join/src/utils.rs +++ b/rust/sedona-spatial-join/src/utils.rs @@ -19,6 +19,7 @@ pub(crate) mod arrow_utils; pub(crate) mod bbox_sampler; pub(crate) mod disposable_async_cell; pub(crate) mod init_once_array; +pub mod internal_benchmark_util; pub(crate) mod join_utils; pub(crate) mod once_fut; pub(crate) mod spill; diff --git a/rust/sedona-spatial-join/bench/partitioning/common.rs b/rust/sedona-spatial-join/src/utils/internal_benchmark_util.rs similarity index 78% rename from rust/sedona-spatial-join/bench/partitioning/common.rs rename to rust/sedona-spatial-join/src/utils/internal_benchmark_util.rs index 77269565c..569d60068 100644 --- a/rust/sedona-spatial-join/bench/partitioning/common.rs +++ b/rust/sedona-spatial-join/src/utils/internal_benchmark_util.rs @@ -16,8 +16,11 @@ // under the License. //! Shared helpers for partitioner benchmarks. +//! +//! This module is **not** part of the public API and exists only to share +//! utility code across benchmark binaries without resorting to a +//! module-level `#![allow(dead_code)]`. -use rand::{rngs::StdRng, RngExt, SeedableRng}; use sedona_geometry::{bounding_box::BoundingBox, interval::IntervalTrait}; pub const GRID_DIM: usize = 4; // 4x4 grid => 16 partitions like typical workloads @@ -59,22 +62,27 @@ pub fn grid_partitions(extent: &BoundingBox, cells_per_axis: usize) -> Vec Vec { - let mut rng = StdRng::seed_from_u64(RNG_SEED); + let mut rng = fastrand::Rng::with_seed(RNG_SEED); let characteristic_span = extent_span(extent) / 8.0; (0..batch_size) .map(|_| random_bbox(extent, &mut rng, characteristic_span)) .collect() } -fn random_bbox(extent: &BoundingBox, rng: &mut impl RngExt, max_span: f64) -> BoundingBox { +/// Generate a random f64 in `[lo, hi]`. +fn random_f64_range(rng: &mut fastrand::Rng, lo: f64, hi: f64) -> f64 { + lo + rng.f64() * (hi - lo) +} + +fn random_bbox(extent: &BoundingBox, rng: &mut fastrand::Rng, max_span: f64) -> BoundingBox { let (min_x, max_x) = (extent.x().lo(), extent.x().hi()); let (min_y, max_y) = (extent.y().lo(), extent.y().hi()); - let span_x = rng.random_range(0.01..max_span).min(max_x - min_x); - let span_y = rng.random_range(0.01..max_span).min(max_y - min_y); + let span_x = random_f64_range(rng, 0.01, max_span).min(max_x - min_x); + let span_y = random_f64_range(rng, 0.01, max_span).min(max_y - min_y); - let start_x = rng.random_range(min_x..=max_x - span_x); - let start_y = rng.random_range(min_y..=max_y - span_y); + let start_x = random_f64_range(rng, min_x, max_x - span_x); + let start_y = random_f64_range(rng, min_y, max_y - span_y); BoundingBox::xy((start_x, start_x + span_x), (start_y, start_y + span_y)) }