diff --git a/Cargo.lock b/Cargo.lock index 46f9ff1b6..f99ff5ceb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5404,6 +5404,7 @@ dependencies = [ "geos", "once_cell", "parking_lot", + "pin-project-lite", "rand", "rstest", "sedona-common", diff --git a/Cargo.toml b/Cargo.toml index 99e084e95..6c20d23df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -92,6 +92,7 @@ dirs = "6.0.0" env_logger = "0.11" fastrand = "2.0" futures = "0.3" +pin-project-lite = "0.2" glam = "0.30.10" object_store = { version = "0.12.4", default-features = false } float_next_after = "1" diff --git a/rust/sedona-spatial-join/Cargo.toml b/rust/sedona-spatial-join/Cargo.toml index b4c8f630c..9831c59b1 100644 --- a/rust/sedona-spatial-join/Cargo.toml +++ b/rust/sedona-spatial-join/Cargo.toml @@ -45,6 +45,7 @@ datafusion-physical-plan = { workspace = true } datafusion-execution = { workspace = true } datafusion-common-runtime = { workspace = true } futures = { workspace = true } +pin-project-lite = { workspace = true } once_cell = { workspace = true } parking_lot = { workspace = true } geo = { workspace = true } @@ -89,3 +90,13 @@ harness = false name = "flat" path = "bench/partitioning/flat.rs" harness = false + +[[bench]] +name = "evaluated_batch_spill" +path = "bench/evaluated_batch/spill.rs" +harness = false + +[[bench]] +name = "external_evaluated_batch_stream" +path = "bench/evaluated_batch/external_evaluated_batch_stream.rs" +harness = false diff --git a/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs b/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs new file mode 100644 index 000000000..950502be0 --- /dev/null +++ b/rust/sedona-spatial-join/bench/evaluated_batch/external_evaluated_batch_stream.rs @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint::black_box; +use std::sync::Arc; + +use arrow_array::{Int32Array, RecordBatch, StringArray}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use datafusion::config::SpillCompression; +use datafusion_common::ScalarValue; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_expr::ColumnarValue; +use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics}; +use futures::StreamExt; +use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY, WKB_VIEW_GEOMETRY}; +use sedona_spatial_join::evaluated_batch::evaluated_batch_stream::external::ExternalEvaluatedBatchStream; +use sedona_spatial_join::evaluated_batch::spill::EvaluatedBatchSpillWriter; +use sedona_spatial_join::evaluated_batch::EvaluatedBatch; +use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray; +use sedona_testing::create::create_array_storage; + +const ROWS: usize = 8192; +const BATCHES_PER_FILE: usize = 64; + +fn make_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])) +} + +fn make_evaluated_batch(num_rows: usize, sedona_type: &SedonaType) -> EvaluatedBatch { + let schema = make_schema(); + let ids: Vec = (0..num_rows).map(|v| v as i32).collect(); + let id_array = Arc::new(Int32Array::from(ids)); + let name_array = Arc::new(StringArray::from(vec![Some("Alice"); num_rows])); + let batch = RecordBatch::try_new(schema, vec![id_array, name_array]) + .expect("failed to build record batch for benchmark"); + + // Use sedona-testing helpers so this benchmark stays focused on spill + stream I/O. + // This builds either a Binary (WKB_GEOMETRY) or BinaryView (WKB_VIEW_GEOMETRY) array. + let wkt_values = vec![Some("POINT (0 0)"); num_rows]; + let geom_array = create_array_storage(&wkt_values, sedona_type); + + let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, sedona_type) + .expect("failed to build geometry array for benchmark"); + + geom_array.distance = Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0)))); + + EvaluatedBatch { batch, geom_array } +} + +fn write_spill_file( + env: Arc, + schema: SchemaRef, + metrics_set: &ExecutionPlanMetricsSet, + sedona_type: &SedonaType, + compression: SpillCompression, + evaluated_batch: &EvaluatedBatch, +) -> Arc { + let metrics = SpillMetrics::new(metrics_set, 0); + let mut writer = EvaluatedBatchSpillWriter::try_new( + env, + schema, + sedona_type, + "bench_external_stream", + compression, + metrics, + None, + ) + .expect("failed to create spill writer for benchmark"); + + for _ in 0..BATCHES_PER_FILE { + writer + .append(evaluated_batch) + .expect("failed to append batch in benchmark"); + } + + Arc::new(writer.finish().expect("failed to finish spill writer")) +} + +fn bench_external_evaluated_batch_stream(c: &mut Criterion) { + let env = Arc::new(RuntimeEnv::default()); + let schema = make_schema(); + let metrics_set = ExecutionPlanMetricsSet::new(); + + let compressions = [ + ("uncompressed", SpillCompression::Uncompressed), + ("lz4", SpillCompression::Lz4Frame), + ]; + + let runtime = tokio::runtime::Builder::new_current_thread() + .build() + .expect("failed to create tokio runtime"); + + for (label, sedona_type) in [("wkb", WKB_GEOMETRY), ("wkb_view", WKB_VIEW_GEOMETRY)] { + let evaluated_batch = make_evaluated_batch(ROWS, &sedona_type); + + for (compression_label, compression) in compressions { + let spill_file = write_spill_file( + Arc::clone(&env), + Arc::clone(&schema), + &metrics_set, + &sedona_type, + compression, + &evaluated_batch, + ); + + let mut group = c.benchmark_group(format!( + "external_evaluated_batch_stream/{label}/{compression_label}" + )); + group.throughput(Throughput::Elements((ROWS * BATCHES_PER_FILE) as u64)); + + group.bench_with_input( + BenchmarkId::new( + "external_stream", + format!("rows_{ROWS}_batches_{BATCHES_PER_FILE}"), + ), + &spill_file, + |b, file| { + b.iter(|| { + runtime.block_on(async { + let stream = + ExternalEvaluatedBatchStream::try_from_spill_file(Arc::clone(file)) + .expect("failed to create external evaluated batch stream"); + futures::pin_mut!(stream); + + let mut rows = 0usize; + while let Some(batch) = stream.next().await { + let batch = + batch.expect("failed to read evaluated batch from stream"); + rows += batch.num_rows(); + black_box(batch); + } + black_box(rows); + }) + }) + }, + ); + + group.finish(); + } + } +} + +criterion_group!( + external_evaluated_batch_stream, + bench_external_evaluated_batch_stream +); +criterion_main!(external_evaluated_batch_stream); diff --git a/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs b/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs new file mode 100644 index 000000000..b87a6b9c1 --- /dev/null +++ b/rust/sedona-spatial-join/bench/evaluated_batch/spill.rs @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::hint::black_box; +use std::sync::Arc; + +use arrow_array::{Int32Array, RecordBatch, StringArray}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput}; +use datafusion::config::SpillCompression; +use datafusion_common::ScalarValue; +use datafusion_execution::runtime_env::RuntimeEnv; +use datafusion_expr::ColumnarValue; +use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics}; +use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY, WKB_VIEW_GEOMETRY}; +use sedona_spatial_join::evaluated_batch::spill::{ + EvaluatedBatchSpillReader, EvaluatedBatchSpillWriter, +}; +use sedona_spatial_join::evaluated_batch::EvaluatedBatch; +use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray; +use sedona_testing::create::create_array_storage; + +const ROWS: usize = 8192; +const BATCHES_PER_FILE: usize = 64; + +fn make_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])) +} + +fn make_evaluated_batch(num_rows: usize, sedona_type: &SedonaType) -> EvaluatedBatch { + let schema = make_schema(); + let ids: Vec = (0..num_rows).map(|v| v as i32).collect(); + let id_array = Arc::new(Int32Array::from(ids)); + let name_array = Arc::new(StringArray::from(vec![Some("Alice"); num_rows])); + let batch = RecordBatch::try_new(schema, vec![id_array, name_array]) + .expect("failed to build record batch for benchmark"); + + // Use sedona-testing helpers so this benchmark stays focused on spill I/O. + // This builds either a Binary (WKB_GEOMETRY) or BinaryView (WKB_VIEW_GEOMETRY) array. + let wkt_values = vec![Some("POINT (0 0)"); num_rows]; + let geom_array = create_array_storage(&wkt_values, sedona_type); + + let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, sedona_type) + .expect("failed to build geometry array for benchmark"); + + // Use a scalar distance so the spilled dist column is constant. + geom_array.distance = Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0)))); + + EvaluatedBatch { batch, geom_array } +} + +fn write_spill_file( + env: Arc, + schema: SchemaRef, + metrics_set: &ExecutionPlanMetricsSet, + sedona_type: &SedonaType, + compression: SpillCompression, + evaluated_batch: &EvaluatedBatch, +) -> Arc { + let metrics = SpillMetrics::new(metrics_set, 0); + let mut writer = EvaluatedBatchSpillWriter::try_new( + env, + schema, + sedona_type, + "bench_spill", + compression, + metrics, + None, + ) + .expect("failed to create spill writer for benchmark"); + + for _ in 0..BATCHES_PER_FILE { + writer + .append(evaluated_batch) + .expect("failed to append batch in benchmark"); + } + + Arc::new(writer.finish().expect("failed to finish spill writer")) +} + +fn bench_spill_writer_and_reader(c: &mut Criterion) { + let env = Arc::new(RuntimeEnv::default()); + let schema = make_schema(); + let metrics_set = ExecutionPlanMetricsSet::new(); + + let compressions = [ + ("uncompressed", SpillCompression::Uncompressed), + ("lz4", SpillCompression::Lz4Frame), + ]; + + for (label, sedona_type) in [("wkb", WKB_GEOMETRY), ("wkb_view", WKB_VIEW_GEOMETRY)] { + let evaluated_batch = make_evaluated_batch(ROWS, &sedona_type); + + for (compression_label, compression) in compressions { + // Prepare a stable spill file for read benchmarks. + let spill_file = write_spill_file( + Arc::clone(&env), + Arc::clone(&schema), + &metrics_set, + &sedona_type, + compression, + &evaluated_batch, + ); + + let mut group = + c.benchmark_group(format!("evaluated_batch_spill/{label}/{compression_label}")); + group.throughput(Throughput::Elements((ROWS * BATCHES_PER_FILE) as u64)); + + group.bench_with_input( + BenchmarkId::new( + "spill_writer", + format!("rows_{ROWS}_batches_{BATCHES_PER_FILE}"), + ), + &evaluated_batch, + |b, batch| { + b.iter(|| { + let metrics = SpillMetrics::new(&metrics_set, 0); + let mut writer = EvaluatedBatchSpillWriter::try_new( + Arc::clone(&env), + Arc::clone(&schema), + &sedona_type, + "bench_spill", + compression, + metrics, + None, + ) + .expect("failed to create spill writer"); + + for _ in 0..BATCHES_PER_FILE { + writer.append(black_box(batch)).unwrap(); + } + + let file = writer.finish().unwrap(); + black_box(file); + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new( + "spill_reader", + format!("rows_{ROWS}_batches_{BATCHES_PER_FILE}"), + ), + &spill_file, + |b, file| { + b.iter(|| { + let mut reader = + EvaluatedBatchSpillReader::try_new(black_box(file.as_ref())) + .expect("failed to create spill reader"); + let mut rows = 0usize; + + while let Some(batch) = reader.next_batch() { + let batch = batch.expect("failed to read evaluated batch"); + rows += batch.num_rows(); + black_box(batch); + } + + black_box(rows); + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new( + "spill_reader_raw", + format!("rows_{ROWS}_batches_{BATCHES_PER_FILE}"), + ), + &spill_file, + |b, file| { + b.iter(|| { + let mut reader = + EvaluatedBatchSpillReader::try_new(black_box(file.as_ref())) + .expect("failed to create spill reader"); + let mut rows = 0usize; + + while let Some(batch) = reader.next_raw_batch() { + let batch = batch.expect("failed to read record batch"); + rows += batch.num_rows(); + black_box(batch); + } + + black_box(rows); + }) + }, + ); + + group.finish(); + } + } +} + +criterion_group!(evaluated_batch_spill, bench_spill_writer_and_reader); +criterion_main!(evaluated_batch_spill); diff --git a/rust/sedona-spatial-join/src/evaluated_batch.rs b/rust/sedona-spatial-join/src/evaluated_batch.rs index d44d49ec3..aad3f11b0 100644 --- a/rust/sedona-spatial-join/src/evaluated_batch.rs +++ b/rust/sedona-spatial-join/src/evaluated_batch.rs @@ -27,7 +27,7 @@ use crate::{ /// EvaluatedBatch contains the original record batch from the input stream and the evaluated /// geometry array. -pub(crate) struct EvaluatedBatch { +pub struct EvaluatedBatch { /// Original record batch polled from the stream pub batch: RecordBatch, /// Evaluated geometry array, containing the geometry array containing geometries to be joined, @@ -65,4 +65,5 @@ impl EvaluatedBatch { } } -pub(crate) mod evaluated_batch_stream; +pub mod evaluated_batch_stream; +pub mod spill; diff --git a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs index eb1f855ca..c18761d26 100644 --- a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs +++ b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs @@ -39,4 +39,5 @@ pub(crate) trait EvaluatedBatchStream: Stream> { pub(crate) type SendableEvaluatedBatchStream = Pin>; pub(crate) mod evaluate; +pub mod external; pub(crate) mod in_mem; diff --git a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs index 0baf3c5f5..f6e185c30 100644 --- a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs +++ b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use std::task::{Context, Poll}; use arrow_array::RecordBatch; -use arrow_schema::{DataType, SchemaRef}; use datafusion_common::Result; use datafusion_physical_plan::{metrics, SendableRecordBatchStream}; use futures::{Stream, StreamExt}; @@ -30,7 +29,7 @@ use crate::evaluated_batch::{ EvaluatedBatch, }; use crate::operand_evaluator::{EvaluatedGeometryArray, OperandEvaluator}; -use crate::utils::arrow_utils::compact_batch; +use crate::utils::arrow_utils::{compact_batch, schema_contains_view_types}; /// An evaluator that can evaluate geometry expressions on record batches /// and produces evaluated geometry arrays. @@ -86,14 +85,6 @@ impl EvaluateOperandBatchStream { } } -/// Checks if the schema contains any view types (Utf8View or BinaryView). -fn schema_contains_view_types(schema: &SchemaRef) -> bool { - schema - .flattened_fields() - .iter() - .any(|field| matches!(field.data_type(), DataType::Utf8View | DataType::BinaryView)) -} - impl EvaluatedBatchStream for EvaluateOperandBatchStream { fn is_external(&self) -> bool { false @@ -160,58 +151,3 @@ pub(crate) fn create_evaluated_probe_stream( false, )) } - -#[cfg(test)] -mod tests { - use std::sync::Arc; - - use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef}; - - use super::schema_contains_view_types; - - fn schema(fields: Vec) -> SchemaRef { - Arc::new(Schema::new(fields)) - } - - #[test] - fn test_schema_contains_view_types_top_level() { - let schema_ref = schema(vec![ - Field::new("a", DataType::Utf8View, true), - Field::new("b", DataType::BinaryView, true), - ]); - - assert!(schema_contains_view_types(&schema_ref)); - - // Similar shape but without view types - let schema_no_view = schema(vec![ - Field::new("a", DataType::Utf8, true), - Field::new("b", DataType::Binary, true), - ]); - assert!(!schema_contains_view_types(&schema_no_view)); - } - - #[test] - fn test_schema_contains_view_types_nested() { - let nested = Field::new( - "s", - DataType::Struct(Fields::from(vec![Field::new( - "v", - DataType::Utf8View, - true, - )])), - true, - ); - - let schema_ref = schema(vec![nested]); - assert!(schema_contains_view_types(&schema_ref)); - - // Nested struct without any view types - let nested_no_view = Field::new( - "s", - DataType::Struct(Fields::from(vec![Field::new("v", DataType::Utf8, true)])), - true, - ); - let schema_no_view = schema(vec![nested_no_view]); - assert!(!schema_contains_view_types(&schema_no_view)); - } -} diff --git a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs new file mode 100644 index 000000000..67d3538ee --- /dev/null +++ b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/external.rs @@ -0,0 +1,638 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + collections::VecDeque, + iter, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use arrow_array::RecordBatch; +use arrow_schema::{Schema, SchemaRef}; +use datafusion_common::{DataFusionError, Result}; +use datafusion_common_runtime::SpawnedTask; +use datafusion_execution::{ + disk_manager::RefCountedTempFile, RecordBatchStream, SendableRecordBatchStream, +}; +use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder; +use futures::{FutureExt, StreamExt}; +use pin_project_lite::pin_project; +use sedona_common::sedona_internal_err; + +use crate::evaluated_batch::{ + evaluated_batch_stream::EvaluatedBatchStream, + spill::{ + spilled_batch_to_evaluated_batch, spilled_schema_to_evaluated_schema, + EvaluatedBatchSpillReader, + }, + EvaluatedBatch, +}; + +const RECORD_BATCH_CHANNEL_CAPACITY: usize = 2; + +pin_project! { + /// Streams [`EvaluatedBatch`] values read back from on-disk spill files. + /// + /// This stream is intended for the “spilled” path where batches have been written to disk and + /// must be read back into memory. It wraps an [`ExternalRecordBatchStream`] and uses + /// background tasks to prefetch/forward batches so downstream operators can process a batch + /// while the next one is being loaded. + pub struct ExternalEvaluatedBatchStream { + #[pin] + inner: RecordBatchToEvaluatedStream, + schema: SchemaRef, + } +} + +enum State { + AwaitingFile, + Opening(SpawnedTask>), + Reading(SpawnedTask<(EvaluatedBatchSpillReader, Option>)>), + Finished, +} + +impl ExternalEvaluatedBatchStream { + /// Creates an external stream from a single spill file. + pub fn try_from_spill_file(spill_file: Arc) -> Result { + let record_stream = + ExternalRecordBatchStream::try_from_spill_files(iter::once(spill_file))?; + let evaluated_stream = + RecordBatchToEvaluatedStream::try_spawned_evaluated_stream(Box::pin(record_stream))?; + let schema = evaluated_stream.schema(); + Ok(Self { + inner: evaluated_stream, + schema, + }) + } + + /// Creates an external stream from multiple spill files. + /// + /// The stream yields the batches from each file in order. When `spill_files` is empty the + /// stream is empty (returns `None` immediately) and no schema validation is performed. + pub fn try_from_spill_files(schema: SchemaRef, spill_files: I) -> Result + where + I: IntoIterator>, + { + let record_stream = ExternalRecordBatchStream::try_from_spill_files(spill_files)?; + if !record_stream.is_empty() { + // `ExternalRecordBatchStream` only has a meaningful schema when at least one spill + // file is provided. In that case, validate that the caller-provided evaluated schema + // matches what would be derived from the spilled schema. + let actual_schema = spilled_schema_to_evaluated_schema(&record_stream.schema())?; + if schema != actual_schema { + return sedona_internal_err!( + "Schema mismatch when creating ExternalEvaluatedBatchStream" + ); + } + } + let evaluated_stream = + RecordBatchToEvaluatedStream::try_spawned_evaluated_stream(Box::pin(record_stream))?; + Ok(Self { + inner: evaluated_stream, + schema, + }) + } +} + +impl EvaluatedBatchStream for ExternalEvaluatedBatchStream { + fn is_external(&self) -> bool { + true + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +impl futures::Stream for ExternalEvaluatedBatchStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().inner.poll_next(cx) + } +} + +pin_project! { + /// Adapts a [`RecordBatchStream`] containing spilled batches into an [`EvaluatedBatch`] stream. + /// + /// Each incoming `RecordBatch` is decoded via [`spilled_batch_to_evaluated_batch`]. This type + /// also carries the derived evaluated schema for downstream consumers. + struct RecordBatchToEvaluatedStream { + #[pin] + inner: SendableRecordBatchStream, + evaluated_schema: SchemaRef, + } +} + +impl RecordBatchToEvaluatedStream { + fn try_new(inner: SendableRecordBatchStream) -> Result { + let evaluated_schema = spilled_schema_to_evaluated_schema(&inner.schema())?; + Ok(Self { + inner, + evaluated_schema, + }) + } + + /// Buffers `record_stream` by forwarding it through a bounded channel. + /// + /// This is primarily useful for [`ExternalRecordBatchStream`], where producing the next batch + /// may involve disk I/O and `spawn_blocking` work. By polling the source stream in a spawned + /// task, we can overlap “load next batch” with “process current batch”, while still applying + /// backpressure via [`RECORD_BATCH_CHANNEL_CAPACITY`]. + /// + /// The forwarding task stops when the receiver is dropped or when the source stream yields its + /// first error. + fn try_spawned_evaluated_stream(record_stream: SendableRecordBatchStream) -> Result { + let schema = record_stream.schema(); + let mut builder = + RecordBatchReceiverStreamBuilder::new(schema, RECORD_BATCH_CHANNEL_CAPACITY); + let tx = builder.tx(); + builder.spawn(async move { + let mut record_stream = record_stream; + while let Some(batch) = record_stream.next().await { + let is_err = batch.is_err(); + if tx.send(batch).await.is_err() { + break; + } + if is_err { + break; + } + } + Ok(()) + }); + + let buffered = builder.build(); + Self::try_new(buffered) + } + + fn schema(&self) -> SchemaRef { + Arc::clone(&self.evaluated_schema) + } +} + +impl futures::Stream for RecordBatchToEvaluatedStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + match this.inner.as_mut().poll_next(cx) { + Poll::Ready(Some(Ok(batch))) => { + Poll::Ready(Some(spilled_batch_to_evaluated_batch(batch))) + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +/// Streams raw [`RecordBatch`] values directly from spill files. +/// +/// This is the lowest-level “read from disk” stream: it opens each spill file, reads the stored +/// record batches sequentially, and yields them without decoding into [`EvaluatedBatch`]. +/// +/// Schema handling: +/// - If at least one spill file is provided, the stream schema is taken from the first file. +/// - If no files are provided, the schema is empty and the stream terminates immediately. +pub(crate) struct ExternalRecordBatchStream { + schema: SchemaRef, + state: State, + spill_files: VecDeque>, + is_empty: bool, +} + +impl ExternalRecordBatchStream { + /// Creates a stream over `spill_files`, yielding all batches from each file in order. + /// + /// This function assumes all spill files were written with a compatible schema. + pub fn try_from_spill_files(spill_files: I) -> Result + where + I: IntoIterator>, + { + let spill_files = spill_files.into_iter().collect::>(); + let (schema, is_empty) = match spill_files.front() { + Some(file) => { + let reader = EvaluatedBatchSpillReader::try_new(file)?; + (reader.schema(), false) + } + None => (Arc::new(Schema::empty()), true), + }; + Ok(Self { + schema, + state: State::AwaitingFile, + spill_files, + is_empty, + }) + } + + pub fn is_empty(&self) -> bool { + self.is_empty + } +} + +impl RecordBatchStream for ExternalRecordBatchStream { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } +} + +impl futures::Stream for ExternalRecordBatchStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let self_mut = self.get_mut(); + + loop { + match &mut self_mut.state { + State::AwaitingFile => match self_mut.spill_files.pop_front() { + Some(spill_file) => { + let task = SpawnedTask::spawn_blocking(move || { + EvaluatedBatchSpillReader::try_new(&spill_file) + }); + self_mut.state = State::Opening(task); + } + None => { + self_mut.state = State::Finished; + return Poll::Ready(None); + } + }, + State::Opening(task) => match futures::ready!(task.poll_unpin(cx)) { + Err(e) => { + self_mut.state = State::Finished; + return Poll::Ready(Some(Err(DataFusionError::External(Box::new(e))))); + } + Ok(Err(e)) => { + self_mut.state = State::Finished; + return Poll::Ready(Some(Err(e))); + } + Ok(Ok(mut spill_reader)) => { + let task = SpawnedTask::spawn_blocking(move || { + let next_batch = spill_reader.next_raw_batch(); + (spill_reader, next_batch) + }); + self_mut.state = State::Reading(task); + } + }, + State::Reading(task) => match futures::ready!(task.poll_unpin(cx)) { + Err(e) => { + self_mut.state = State::Finished; + return Poll::Ready(Some(Err(DataFusionError::External(Box::new(e))))); + } + Ok((_, None)) => { + self_mut.state = State::AwaitingFile; + continue; + } + Ok((_, Some(Err(e)))) => { + self_mut.state = State::Finished; + return Poll::Ready(Some(Err(e))); + } + Ok((mut spill_reader, Some(Ok(batch)))) => { + let task = SpawnedTask::spawn_blocking(move || { + let next_batch = spill_reader.next_raw_batch(); + (spill_reader, next_batch) + }); + self_mut.state = State::Reading(task); + return Poll::Ready(Some(Ok(batch))); + } + }, + State::Finished => { + return Poll::Ready(None); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::evaluated_batch::spill::EvaluatedBatchSpillWriter; + use crate::operand_evaluator::EvaluatedGeometryArray; + use arrow_array::{Array, ArrayRef, BinaryArray, Int32Array, RecordBatch, StringArray}; + use arrow_schema::{DataType, Field, Schema, SchemaRef}; + use datafusion::config::SpillCompression; + use datafusion_common::{Result, ScalarValue}; + use datafusion_execution::runtime_env::RuntimeEnv; + use datafusion_expr::ColumnarValue; + use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics}; + use futures::StreamExt; + use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY}; + use std::sync::Arc; + + fn create_test_runtime_env() -> Result> { + Ok(Arc::new(RuntimeEnv::default())) + } + + fn create_test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])) + } + + fn create_test_record_batch(start_id: i32) -> Result { + let schema = create_test_schema(); + let id_array = Arc::new(Int32Array::from(vec![start_id, start_id + 1, start_id + 2])); + let name_array = Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob"), None])); + RecordBatch::try_new(schema, vec![id_array, name_array]).map_err(|e| e.into()) + } + + fn create_test_geometry_array() -> Result<(ArrayRef, SedonaType)> { + // Create WKB encoded points (simple binary data for testing) + let point1_wkb: Vec = vec![ + 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 240, 63, 0, 0, 0, 0, 0, 0, 0, 64, + ]; + let point2_wkb: Vec = vec![ + 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 16, 64, + ]; + let point3_wkb: Vec = vec![ + 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 64, 0, 0, 0, 0, 0, 0, 24, 64, + ]; + + let sedona_type = WKB_GEOMETRY; + let geom_array: ArrayRef = Arc::new(BinaryArray::from(vec![ + Some(point1_wkb.as_slice()), + Some(point2_wkb.as_slice()), + Some(point3_wkb.as_slice()), + ])); + + Ok((geom_array, sedona_type)) + } + + fn create_test_evaluated_batch(start_id: i32) -> Result { + let batch = create_test_record_batch(start_id)?; + let (geom_array, sedona_type) = create_test_geometry_array()?; + let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, &sedona_type)?; + + // Add distance as a scalar value + geom_array.distance = Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0)))); + + Ok(EvaluatedBatch { batch, geom_array }) + } + + async fn create_spill_file_with_batches(num_batches: usize) -> Result { + let env = create_test_runtime_env()?; + let schema = create_test_schema(); + let sedona_type = WKB_GEOMETRY; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + let mut writer = EvaluatedBatchSpillWriter::try_new( + env, + schema, + &sedona_type, + "test_external_stream", + SpillCompression::Uncompressed, + metrics, + None, + )?; + + for i in 0..num_batches { + let batch = create_test_evaluated_batch((i * 3) as i32)?; + writer.append(&batch)?; + } + + writer.finish() + } + + #[tokio::test] + async fn test_external_stream_creation() -> Result<()> { + let spill_file = create_spill_file_with_batches(1).await?; + let stream = ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?; + + assert!(stream.is_external()); + assert_eq!(stream.schema(), create_test_schema()); + + Ok(()) + } + + #[tokio::test] + async fn test_external_stream_single_batch() -> Result<()> { + let spill_file = create_spill_file_with_batches(1).await?; + let mut stream = ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?; + + let batch = stream.next().await.unwrap()?; + assert_eq!(batch.num_rows(), 3); + + // Polling again should still return None + assert!(stream.next().await.is_none()); + assert!(stream.next().await.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_external_stream_large_number_of_batches() -> Result<()> { + let num_batches = 100; + let spill_file = create_spill_file_with_batches(num_batches).await?; + let mut stream = ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?; + + let mut count = 0; + while let Some(batch_result) = stream.next().await { + let batch = batch_result?; + assert_eq!(batch.num_rows(), 3); + count += 1; + } + + assert_eq!(count, num_batches); + + Ok(()) + } + + #[tokio::test] + async fn test_external_stream_is_external_flag() -> Result<()> { + let spill_file = create_spill_file_with_batches(1).await?; + let stream = ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?; + + // Verify the is_external flag returns true + assert!(stream.is_external()); + + Ok(()) + } + + #[tokio::test] + async fn test_external_stream_concurrent_access() -> Result<()> { + // Create multiple streams reading from different files + let file1 = create_spill_file_with_batches(2).await?; + let file2 = create_spill_file_with_batches(3).await?; + + let mut stream1 = ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(file1))?; + let mut stream2 = ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(file2))?; + + // Read from both streams + let batch1_1 = stream1.next().await.unwrap()?; + let batch2_1 = stream2.next().await.unwrap()?; + + assert_eq!(batch1_1.num_rows(), 3); + assert_eq!(batch2_1.num_rows(), 3); + + // Continue reading + let batch1_2 = stream1.next().await.unwrap()?; + let batch2_2 = stream2.next().await.unwrap()?; + + assert_eq!(batch1_2.num_rows(), 3); + assert_eq!(batch2_2.num_rows(), 3); + + // Stream1 should be done, stream2 should have one more + assert!(stream1.next().await.is_none()); + let batch2_3 = stream2.next().await.unwrap()?; + assert_eq!(batch2_3.num_rows(), 3); + assert!(stream2.next().await.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_external_stream_multiple_spill_files() -> Result<()> { + let file1 = create_spill_file_with_batches(2).await?; + let file2 = create_spill_file_with_batches(3).await?; + let schema = create_test_schema(); + let mut stream = ExternalEvaluatedBatchStream::try_from_spill_files( + Arc::clone(&schema), + vec![Arc::new(file1), Arc::new(file2)], + )?; + + assert_eq!(stream.schema(), schema); + + let mut batches_read = 0; + while let Some(batch_result) = stream.next().await { + let batch = batch_result?; + assert_eq!(batch.num_rows(), 3); + batches_read += 1; + } + + assert_eq!(batches_read, 5); + + Ok(()) + } + + #[tokio::test] + async fn test_external_stream_empty_file() -> Result<()> { + let spill_file = create_spill_file_with_batches(0).await?; + let mut stream = ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?; + + // Should immediately return None + let batch = stream.next().await; + assert!(batch.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn test_external_stream_empty_spill_file_list() -> Result<()> { + let schema = create_test_schema(); + let stream = ExternalEvaluatedBatchStream::try_from_spill_files( + Arc::clone(&schema), + Vec::>::new(), + )?; + + assert_eq!(stream.schema(), schema); + + Ok(()) + } + + #[tokio::test] + async fn test_external_stream_preserves_data() -> Result<()> { + let spill_file = create_spill_file_with_batches(1).await?; + let mut stream = ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?; + + let batch = stream.next().await.unwrap()?; + + // Verify data preservation + let id_array = batch + .batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(id_array.value(0), 0); + assert_eq!(id_array.value(1), 1); + assert_eq!(id_array.value(2), 2); + + let name_array = batch + .batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(name_array.value(0), "Alice"); + assert_eq!(name_array.value(1), "Bob"); + assert!(name_array.is_null(2)); + + // Verify geometry array + assert_eq!(batch.geom_array.rects.len(), 3); + + // Verify distance + match &batch.geom_array.distance { + Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(val)))) => { + assert_eq!(*val, 10.0); + } + _ => panic!("Expected scalar distance value"), + } + + Ok(()) + } + + #[tokio::test] + async fn test_external_stream_multiple_batches() -> Result<()> { + let num_batches = 5; + let spill_file = create_spill_file_with_batches(num_batches).await?; + let mut stream = ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?; + + let mut batches_read = 0; + while let Some(batch_result) = stream.next().await { + let batch = batch_result?; + assert_eq!(batch.num_rows(), 3); + + // Verify the ID starts at the expected value + let id_array = batch + .batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let expected_start_id = (batches_read * 3) as i32; + assert_eq!(id_array.value(0), expected_start_id); + + batches_read += 1; + } + + assert_eq!(batches_read, num_batches); + + Ok(()) + } + + #[tokio::test] + async fn test_external_stream_poll_after_completion() -> Result<()> { + let spill_file = create_spill_file_with_batches(1).await?; + let mut stream = ExternalEvaluatedBatchStream::try_from_spill_file(Arc::new(spill_file))?; + + // Read the batch + let _ = stream.next().await.unwrap()?; + + // Should return None + assert!(stream.next().await.is_none()); + + // Polling again should still return None + assert!(stream.next().await.is_none()); + assert!(stream.next().await.is_none()); + + Ok(()) + } +} diff --git a/rust/sedona-spatial-join/src/evaluated_batch/spill.rs b/rust/sedona-spatial-join/src/evaluated_batch/spill.rs new file mode 100644 index 000000000..9d5dd8a84 --- /dev/null +++ b/rust/sedona-spatial-join/src/evaluated_batch/spill.rs @@ -0,0 +1,806 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use arrow::array::Float64Array; +use arrow_array::{Array, RecordBatch, StructArray}; +use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef}; +use datafusion::config::SpillCompression; +use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_execution::{disk_manager::RefCountedTempFile, runtime_env::RuntimeEnv}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_plan::metrics::SpillMetrics; +use sedona_common::sedona_internal_err; +use sedona_schema::datatypes::SedonaType; + +use crate::{ + evaluated_batch::EvaluatedBatch, + operand_evaluator::EvaluatedGeometryArray, + utils::spill::{RecordBatchSpillReader, RecordBatchSpillWriter}, +}; + +/// Writer for spilling evaluated batches to disk +pub struct EvaluatedBatchSpillWriter { + /// The temporary spill file being written to + inner: RecordBatchSpillWriter, + + /// Schema of the spilled record batches. It is augmented from the schema of original record batches + /// The spill_schema has 4 fields: + /// * `data`: StructArray containing the original record batch columns + /// * `geom`: geometry array in storage format + /// * `dist`: distance field + spill_schema: Schema, + /// Inner fields of the "data" StructArray in the spilled record batches + data_inner_fields: Fields, +} + +const SPILL_FIELD_DATA_INDEX: usize = 0; +const SPILL_FIELD_GEOM_INDEX: usize = 1; +const SPILL_FIELD_DIST_INDEX: usize = 2; + +impl EvaluatedBatchSpillWriter { + /// Create a new SpillWriter + pub fn try_new( + env: Arc, + schema: SchemaRef, + sedona_type: &SedonaType, + request_description: &str, + compression: SpillCompression, + metrics: SpillMetrics, + batch_size_threshold: Option, + ) -> Result { + // Construct schema of record batches to be written. The written batches are augmented from the original record batches. + let data_inner_fields = schema.fields().clone(); + let data_struct_field = + Field::new("data", DataType::Struct(data_inner_fields.clone()), false); + let geom_field = sedona_type.to_storage_field("geom", true)?; + let dist_field = Field::new("dist", DataType::Float64, true); + let spill_schema = Schema::new(vec![data_struct_field, geom_field, dist_field]); + + // Create spill file + let inner = RecordBatchSpillWriter::try_new( + env, + Arc::new(spill_schema.clone()), + request_description, + compression, + metrics, + batch_size_threshold, + )?; + + Ok(Self { + inner, + spill_schema, + data_inner_fields, + }) + } + + /// Append an EvaluatedBatch to the spill file + pub fn append(&mut self, evaluated_batch: &EvaluatedBatch) -> Result<()> { + let record_batch = self.spilled_record_batch(evaluated_batch)?; + + // Splitting/compaction and spill bytes/rows metrics are handled by `RecordBatchSpillWriter`. + self.inner.write_batch(record_batch)?; + Ok(()) + } + + /// Finish writing and return the temporary file + pub fn finish(self) -> Result { + self.inner.finish() + } + + fn spilled_record_batch(&self, evaluated_batch: &EvaluatedBatch) -> Result { + let num_rows = evaluated_batch.num_rows(); + + // Store the original data batch into a StructArray + let data_batch = &evaluated_batch.batch; + let data_arrays = data_batch.columns().to_vec(); + let data_struct_array = + StructArray::try_new(self.data_inner_fields.clone(), data_arrays, None)?; + + // Store dist into a Float64Array + let mut dist_builder = arrow::array::Float64Builder::with_capacity(num_rows); + let geom_array = &evaluated_batch.geom_array; + match &geom_array.distance { + Some(ColumnarValue::Scalar(scalar)) => match scalar { + ScalarValue::Float64(dist_value) => { + for _ in 0..num_rows { + dist_builder.append_option(*dist_value); + } + } + _ => { + return sedona_internal_err!("Distance columnar value is not a Float64Array"); + } + }, + Some(ColumnarValue::Array(array)) => { + let float_array = array + .as_any() + .downcast_ref::() + .unwrap(); + dist_builder.append_array(float_array); + } + None => { + for _ in 0..num_rows { + dist_builder.append_null(); + } + } + } + let dist_array = dist_builder.finish(); + + // Assemble the final spilled RecordBatch + let columns = vec![ + Arc::new(data_struct_array) as Arc, + Arc::clone(&geom_array.geometry_array), + Arc::new(dist_array) as Arc, + ]; + let spilled_record_batch = + RecordBatch::try_new(Arc::new(self.spill_schema.clone()), columns)?; + Ok(spilled_record_batch) + } +} +/// Reader for reading spilled evaluated batches from disk +pub struct EvaluatedBatchSpillReader { + inner: RecordBatchSpillReader, +} +impl EvaluatedBatchSpillReader { + /// Create a new SpillReader + pub fn try_new(temp_file: &RefCountedTempFile) -> Result { + Ok(Self { + inner: RecordBatchSpillReader::try_new(temp_file)?, + }) + } + + /// Get the schema of the spilled data + pub fn schema(&self) -> SchemaRef { + self.inner.schema() + } + + /// Read the next EvaluatedBatch from the spill file + #[allow(unused)] + pub fn next_batch(&mut self) -> Option> { + self.next_raw_batch() + .map(|record_batch| record_batch.and_then(spilled_batch_to_evaluated_batch)) + } + + /// Read the next raw RecordBatch from the spill file + pub fn next_raw_batch(&mut self) -> Option> { + self.inner.next_batch() + } +} + +pub(crate) fn spilled_batch_to_evaluated_batch( + record_batch: RecordBatch, +) -> Result { + // Extract the data struct array (column 0) and convert back to the original RecordBatch + let data_array = record_batch + .column(SPILL_FIELD_DATA_INDEX) + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal("Expected data column to be a StructArray".to_string()) + })?; + + let data_schema = Arc::new(Schema::new(match data_array.data_type() { + DataType::Struct(fields) => fields.clone(), + _ => { + return Err(DataFusionError::Internal( + "Expected data column to have Struct data type".to_string(), + )) + } + })); + + let data_columns = (0..data_array.num_columns()) + .map(|i| Arc::clone(data_array.column(i))) + .collect::>(); + + let batch = RecordBatch::try_new(data_schema, data_columns)?; + + // Extract the geometry array (column 1) + let geom_array = Arc::clone(record_batch.column(SPILL_FIELD_GEOM_INDEX)); + + // Determine the SedonaType from the geometry field in the record batch schema + let schema = record_batch.schema(); + let geom_field = schema.field(SPILL_FIELD_GEOM_INDEX); + let sedona_type = SedonaType::from_storage_field(geom_field)?; + + // Extract the distance array (column 3) and convert back to ColumnarValue + let dist_array = record_batch + .column(SPILL_FIELD_DIST_INDEX) + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal("Expected dist column to be Float64Array".to_string()) + })?; + + let distance = if !dist_array.is_empty() { + // Check if all values are the same (scalar case) + let first_value = if dist_array.is_null(0) { + None + } else { + Some(dist_array.value(0)) + }; + + let all_same = (1..dist_array.len()).all(|i| { + let current_value = if dist_array.is_null(i) { + None + } else { + Some(dist_array.value(i)) + }; + current_value == first_value + }); + + if all_same { + Some(ColumnarValue::Scalar(ScalarValue::Float64(first_value))) + } else { + Some(ColumnarValue::Array(Arc::clone( + record_batch.column(SPILL_FIELD_DIST_INDEX), + ))) + } + } else { + None + }; + + // Create EvaluatedGeometryArray + let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, &sedona_type)?; + geom_array.distance = distance; + + Ok(EvaluatedBatch { batch, geom_array }) +} + +pub(crate) fn spilled_schema_to_evaluated_schema(spilled_schema: &SchemaRef) -> Result { + if spilled_schema.fields().is_empty() { + return Ok(SchemaRef::new(Schema::empty())); + } + + let data_field = spilled_schema.field(SPILL_FIELD_DATA_INDEX); + let inner_fields = match data_field.data_type() { + DataType::Struct(fields) => fields.clone(), + _ => { + return sedona_internal_err!("Invalid schema of spilled file: {:?}", spilled_schema); + } + }; + Ok(SchemaRef::new(Schema::new(inner_fields))) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::utils::arrow_utils::get_record_batch_memory_size; + use arrow_array::{ArrayRef, BinaryArray, Int32Array, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use datafusion_common::Result; + use datafusion_execution::runtime_env::RuntimeEnv; + use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; + use sedona_schema::datatypes::WKB_GEOMETRY; + use std::sync::Arc; + + fn create_test_runtime_env() -> Result> { + Ok(Arc::new(RuntimeEnv::default())) + } + + fn create_test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])) + } + + fn create_test_record_batch() -> Result { + let schema = create_test_schema(); + let id_array = Arc::new(Int32Array::from(vec![1, 2, 3])); + let name_array = Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob"), None])); + RecordBatch::try_new(schema, vec![id_array, name_array]).map_err(|e| e.into()) + } + + fn create_test_geometry_array() -> Result<(ArrayRef, SedonaType)> { + // Create WKB encoded points (simple binary data for testing) + // WKB for POINT (1 2): 01 01000000 0000000000000000F03F 0000000000000040 + let point1_wkb: Vec = vec![ + 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 240, 63, 0, 0, 0, 0, 0, 0, 0, 64, + ]; + let point2_wkb: Vec = vec![ + 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 8, 64, 0, 0, 0, 0, 0, 0, 16, 64, + ]; + let point3_wkb: Vec = vec![ + 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 64, 0, 0, 0, 0, 0, 0, 24, 64, + ]; + + let sedona_type = WKB_GEOMETRY; + let geom_array: ArrayRef = Arc::new(BinaryArray::from(vec![ + Some(point1_wkb.as_slice()), + Some(point2_wkb.as_slice()), + Some(point3_wkb.as_slice()), + ])); + + Ok((geom_array, sedona_type)) + } + + fn create_test_evaluated_batch() -> Result { + let batch = create_test_record_batch()?; + let (geom_array, sedona_type) = create_test_geometry_array()?; + let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, &sedona_type)?; + + // Add distance as a scalar value + geom_array.distance = Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(10.0)))); + + Ok(EvaluatedBatch { batch, geom_array }) + } + + fn create_test_evaluated_batch_with_array_distance() -> Result { + let batch = create_test_record_batch()?; + let (geom_array, sedona_type) = create_test_geometry_array()?; + let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, &sedona_type)?; + + // Add distance as an array value + let dist_array = Arc::new(Float64Array::from(vec![Some(1.0), Some(2.0), Some(3.0)])); + geom_array.distance = Some(ColumnarValue::Array(dist_array)); + + Ok(EvaluatedBatch { batch, geom_array }) + } + + fn create_test_evaluated_batch_with_nulls() -> Result { + let schema = create_test_schema(); + let id_array = Arc::new(Int32Array::from(vec![1, 2, 3])); + let name_array = Arc::new(StringArray::from(vec![ + Some("Alice"), + None, + Some("Charlie"), + ])); + let batch = RecordBatch::try_new(schema, vec![id_array, name_array]) + .map_err(DataFusionError::from)?; + + // Create geometry array with null in the middle + let point1_wkb: Vec = vec![ + 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 240, 63, 0, 0, 0, 0, 0, 0, 0, 64, + ]; + let point3_wkb: Vec = vec![ + 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 20, 64, 0, 0, 0, 0, 0, 0, 24, 64, + ]; + + let sedona_type = WKB_GEOMETRY; + let geom_array: ArrayRef = Arc::new(BinaryArray::from(vec![ + Some(point1_wkb.as_slice()), + None, + Some(point3_wkb.as_slice()), + ])); + + let mut geom_array = EvaluatedGeometryArray::try_new(geom_array, &sedona_type)?; + + // Add distance with nulls + let dist_array = Arc::new(Float64Array::from(vec![Some(1.0), None, Some(3.0)])); + geom_array.distance = Some(ColumnarValue::Array(dist_array)); + + Ok(EvaluatedBatch { batch, geom_array }) + } + + #[test] + fn test_spill_writer_creation() -> Result<()> { + let env = create_test_runtime_env()?; + let schema = create_test_schema(); + let sedona_type = WKB_GEOMETRY; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + let writer = EvaluatedBatchSpillWriter::try_new( + env, + schema, + &sedona_type, + "test_spill", + SpillCompression::Uncompressed, + metrics, + None, + )?; + + // Verify the spill schema has the expected structure + assert_eq!(writer.spill_schema.fields().len(), 3); + assert_eq!( + writer.spill_schema.field(SPILL_FIELD_DATA_INDEX).name(), + "data" + ); + assert_eq!( + writer.spill_schema.field(SPILL_FIELD_GEOM_INDEX).name(), + "geom" + ); + assert_eq!( + writer.spill_schema.field(SPILL_FIELD_DIST_INDEX).name(), + "dist" + ); + + Ok(()) + } + + #[test] + fn test_spill_write_and_read_basic() -> Result<()> { + let env = create_test_runtime_env()?; + let schema = create_test_schema(); + let sedona_type = WKB_GEOMETRY; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + let mut writer = EvaluatedBatchSpillWriter::try_new( + env, + schema, + &sedona_type, + "test_spill", + SpillCompression::Uncompressed, + metrics, + None, + )?; + + let evaluated_batch = create_test_evaluated_batch()?; + let original_num_rows = evaluated_batch.num_rows(); + + writer.append(&evaluated_batch)?; + let temp_file = writer.finish()?; + + // Read back the spilled data + let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?; + let read_batch_result = reader.next_batch(); + + assert!(read_batch_result.is_some()); + let read_batch = read_batch_result.unwrap()?; + + // Verify the data + assert_eq!(read_batch.num_rows(), original_num_rows); + assert_eq!(read_batch.batch.num_columns(), 2); // id and name columns + + // Verify that there are no more batches + assert!(reader.next_batch().is_none()); + + Ok(()) + } + + #[test] + fn test_spill_write_and_read_with_array_distance() -> Result<()> { + let env = create_test_runtime_env()?; + let schema = create_test_schema(); + let sedona_type = WKB_GEOMETRY; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + let mut writer = EvaluatedBatchSpillWriter::try_new( + env, + schema, + &sedona_type, + "test_spill", + SpillCompression::Uncompressed, + metrics, + None, + )?; + + let evaluated_batch = create_test_evaluated_batch_with_array_distance()?; + writer.append(&evaluated_batch)?; + let temp_file = writer.finish()?; + + // Read back the spilled data + let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?; + let read_batch = reader.next_batch().unwrap()?; + + // Verify distance is read back as array + match &read_batch.geom_array.distance { + Some(ColumnarValue::Array(array)) => { + let float_array = array.as_any().downcast_ref::().unwrap(); + assert_eq!(float_array.len(), 3); + assert_eq!(float_array.value(0), 1.0); + assert_eq!(float_array.value(1), 2.0); + assert_eq!(float_array.value(2), 3.0); + } + _ => panic!("Expected distance to be an array"), + } + + Ok(()) + } + + #[test] + fn test_spill_write_and_read_with_nulls() -> Result<()> { + let env = create_test_runtime_env()?; + let schema = create_test_schema(); + let sedona_type = WKB_GEOMETRY; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + let mut writer = EvaluatedBatchSpillWriter::try_new( + env, + schema, + &sedona_type, + "test_spill", + SpillCompression::Uncompressed, + metrics, + None, + )?; + + let evaluated_batch = create_test_evaluated_batch_with_nulls()?; + writer.append(&evaluated_batch)?; + let temp_file = writer.finish()?; + + // Read back the spilled data + let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?; + let read_batch = reader.next_batch().unwrap()?; + + // Verify nulls are preserved + assert_eq!(read_batch.num_rows(), 3); + assert!(read_batch.geom_array.rects[1].is_none()); // Null geometry + + // Verify distance nulls + match &read_batch.geom_array.distance { + Some(ColumnarValue::Array(array)) => { + let float_array = array.as_any().downcast_ref::().unwrap(); + assert!(float_array.is_valid(0)); + assert!(float_array.is_null(1)); + assert!(float_array.is_valid(2)); + } + _ => panic!("Expected distance to be an array"), + } + + Ok(()) + } + + #[test] + fn test_spill_multiple_batches() -> Result<()> { + let env = create_test_runtime_env()?; + let schema = create_test_schema(); + let sedona_type = WKB_GEOMETRY; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + let mut writer = EvaluatedBatchSpillWriter::try_new( + env, + schema, + &sedona_type, + "test_spill", + SpillCompression::Uncompressed, + metrics, + None, + )?; + + // Write multiple batches + let batch1 = create_test_evaluated_batch()?; + let batch2 = create_test_evaluated_batch_with_array_distance()?; + let batch3 = create_test_evaluated_batch_with_nulls()?; + + writer.append(&batch1)?; + writer.append(&batch2)?; + writer.append(&batch3)?; + let temp_file = writer.finish()?; + + // Read back all batches + let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?; + + let read_batch1 = reader.next_batch().unwrap()?; + assert_eq!(read_batch1.num_rows(), 3); + + let read_batch2 = reader.next_batch().unwrap()?; + assert_eq!(read_batch2.num_rows(), 3); + + let read_batch3 = reader.next_batch().unwrap()?; + assert_eq!(read_batch3.num_rows(), 3); + + // Verify no more batches + assert!(reader.next_batch().is_none()); + + Ok(()) + } + + #[test] + fn test_spill_metrics_updated() -> Result<()> { + let env = create_test_runtime_env()?; + let schema = create_test_schema(); + let sedona_type = WKB_GEOMETRY; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + let mut writer = EvaluatedBatchSpillWriter::try_new( + env, + schema, + &sedona_type, + "test_spill", + SpillCompression::Uncompressed, + metrics.clone(), + None, + )?; + + let evaluated_batch = create_test_evaluated_batch()?; + writer.append(&evaluated_batch)?; + writer.finish()?; + + // Verify spill metrics were updated + assert!(metrics.spilled_rows.value() > 0); + assert!(metrics.spilled_bytes.value() > 0); + + // Verify spill file count was updated + assert_eq!(metrics.spill_file_count.value(), 1); + + Ok(()) + } + + #[test] + fn test_spill_rect_preservation() -> Result<()> { + let env = create_test_runtime_env()?; + let schema = create_test_schema(); + let sedona_type = WKB_GEOMETRY; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + let mut writer = EvaluatedBatchSpillWriter::try_new( + env, + schema, + &sedona_type, + "test_spill", + SpillCompression::Uncompressed, + metrics, + None, + )?; + + let evaluated_batch = create_test_evaluated_batch()?; + let original_rects = evaluated_batch.rects().clone(); + + writer.append(&evaluated_batch)?; + let temp_file = writer.finish()?; + + // Read back and verify rects + let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?; + let read_batch = reader.next_batch().unwrap()?; + + assert_eq!(read_batch.rects().len(), original_rects.len()); + for (original, read) in original_rects.iter().zip(read_batch.rects().iter()) { + match (original, read) { + (Some(orig_rect), Some(read_rect)) => { + assert_eq!(orig_rect.min().x, read_rect.min().x); + assert_eq!(orig_rect.min().y, read_rect.min().y); + assert_eq!(orig_rect.max().x, read_rect.max().x); + assert_eq!(orig_rect.max().y, read_rect.max().y); + } + (None, None) => {} + _ => panic!("Rect mismatch between original and read"), + } + } + + Ok(()) + } + + #[test] + fn test_spill_scalar_distance_preserved() -> Result<()> { + let env = create_test_runtime_env()?; + let schema = create_test_schema(); + let sedona_type = WKB_GEOMETRY; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + let mut writer = EvaluatedBatchSpillWriter::try_new( + env, + schema, + &sedona_type, + "test_spill", + SpillCompression::Uncompressed, + metrics, + None, + )?; + + let evaluated_batch = create_test_evaluated_batch()?; + writer.append(&evaluated_batch)?; + let temp_file = writer.finish()?; + + // Read back and verify scalar distance is preserved + let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?; + let read_batch = reader.next_batch().unwrap()?; + + match &read_batch.geom_array.distance { + Some(ColumnarValue::Scalar(ScalarValue::Float64(Some(val)))) => { + assert_eq!(*val, 10.0); + } + _ => panic!("Expected scalar distance value"), + } + + Ok(()) + } + + #[test] + fn test_spill_empty_batch() -> Result<()> { + let env = create_test_runtime_env()?; + let schema = create_test_schema(); + let sedona_type = WKB_GEOMETRY; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + let mut writer = EvaluatedBatchSpillWriter::try_new( + env, + schema.clone(), + &sedona_type, + "test_spill", + SpillCompression::Uncompressed, + metrics, + None, + )?; + + // Create an empty batch + let id_array = Arc::new(Int32Array::from(Vec::::new())); + let name_array = Arc::new(StringArray::from(Vec::>::new())); + let empty_batch = RecordBatch::try_new(schema, vec![id_array, name_array]) + .map_err(DataFusionError::from)?; + + let geom_array: ArrayRef = Arc::new(BinaryArray::from(Vec::>::new())); + let geom_array = EvaluatedGeometryArray::try_new(geom_array, &sedona_type)?; + + let evaluated_batch = EvaluatedBatch { + batch: empty_batch, + geom_array, + }; + + writer.append(&evaluated_batch)?; + let temp_file = writer.finish()?; + + // Read back and verify + let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?; + let read_batch = reader.next_batch().unwrap()?; + assert_eq!(read_batch.num_rows(), 0); + + Ok(()) + } + + #[test] + fn test_spill_batch_splitting() -> Result<()> { + let env = create_test_runtime_env()?; + let schema = create_test_schema(); + let sedona_type = WKB_GEOMETRY; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + // Create a batch + let evaluated_batch = create_test_evaluated_batch()?; + let batch_size = get_record_batch_memory_size(&evaluated_batch.batch)?; + + // Set threshold to be smaller than batch size, so it splits into at least 2 parts + let threshold = batch_size / 2; + + let mut writer = EvaluatedBatchSpillWriter::try_new( + env, + schema, + &sedona_type, + "test_spill", + SpillCompression::Uncompressed, + metrics, + Some(threshold), + )?; + + writer.append(&evaluated_batch)?; + let temp_file = writer.finish()?; + + // Read back the spilled data + let mut reader = EvaluatedBatchSpillReader::try_new(&temp_file)?; + + // We expect multiple batches + let mut num_batches = 0; + let mut total_rows = 0; + while let Some(batch_result) = reader.next_batch() { + let batch = batch_result?; + num_batches += 1; + total_rows += batch.num_rows(); + } + + assert!( + num_batches > 1, + "Batch should have been split into multiple batches" + ); + assert_eq!( + total_rows, + evaluated_batch.num_rows(), + "Total rows should match" + ); + + Ok(()) + } +} diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs b/rust/sedona-spatial-join/src/index/build_side_collector.rs index a1643834b..9230aaf49 100644 --- a/rust/sedona-spatial-join/src/index/build_side_collector.rs +++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs @@ -140,7 +140,7 @@ impl BuildSideBatchesCollector { self.collect_all_concurrently(streams, reservations, metrics_vec) .await } else { - self.collect_all_sequential(streams, reservations, metrics_vec) + self.collect_all_sequentially(streams, reservations, metrics_vec) .await } } @@ -187,7 +187,7 @@ impl BuildSideBatchesCollector { Ok(partitions.into_iter().map(|v| v.unwrap()).collect()) } - async fn collect_all_sequential( + async fn collect_all_sequentially( &self, streams: Vec, reservations: Vec, diff --git a/rust/sedona-spatial-join/src/lib.rs b/rust/sedona-spatial-join/src/lib.rs index 0c99b91b3..94af3f225 100644 --- a/rust/sedona-spatial-join/src/lib.rs +++ b/rust/sedona-spatial-join/src/lib.rs @@ -16,7 +16,7 @@ // under the License. mod build_index; -mod evaluated_batch; +pub mod evaluated_batch; pub mod exec; mod index; pub mod operand_evaluator; diff --git a/rust/sedona-spatial-join/src/operand_evaluator.rs b/rust/sedona-spatial-join/src/operand_evaluator.rs index b76e91165..8b4313962 100644 --- a/rust/sedona-spatial-join/src/operand_evaluator.rs +++ b/rust/sedona-spatial-join/src/operand_evaluator.rs @@ -90,7 +90,7 @@ pub(crate) fn create_operand_evaluator( } /// Result of evaluating a geometry batch. -pub(crate) struct EvaluatedGeometryArray { +pub struct EvaluatedGeometryArray { /// The array of geometries produced by evaluating the geometry expression. pub geometry_array: ArrayRef, /// The rects of the geometries in the geometry array. The length of this array is equal to the number of geometries. diff --git a/rust/sedona-spatial-join/src/utils.rs b/rust/sedona-spatial-join/src/utils.rs index 6db8f85a4..3f53df474 100644 --- a/rust/sedona-spatial-join/src/utils.rs +++ b/rust/sedona-spatial-join/src/utils.rs @@ -21,3 +21,4 @@ pub(crate) mod concurrent_reservation; pub(crate) mod init_once_array; pub(crate) mod join_utils; pub(crate) mod once_fut; +pub(crate) mod spill; diff --git a/rust/sedona-spatial-join/src/utils/arrow_utils.rs b/rust/sedona-spatial-join/src/utils/arrow_utils.rs index 367568fc2..258b1f8ab 100644 --- a/rust/sedona-spatial-join/src/utils/arrow_utils.rs +++ b/rust/sedona-spatial-join/src/utils/arrow_utils.rs @@ -21,10 +21,21 @@ use arrow::array::{Array, ArrayData, BinaryViewArray, ListArray, RecordBatch, St use arrow_array::make_array; use arrow_array::ArrayRef; use arrow_array::StructArray; +use arrow_schema::SchemaRef; use arrow_schema::{ArrowError, DataType}; use datafusion_common::Result; use sedona_common::sedona_internal_err; +/// Checks if the schema contains any view types (Utf8View or BinaryView). Batches +/// with view types may need special handling (e.g. compaction) before spilling +/// or holding in memory for extended periods. +pub(crate) fn schema_contains_view_types(schema: &SchemaRef) -> bool { + schema + .flattened_fields() + .iter() + .any(|field| matches!(field.data_type(), DataType::Utf8View | DataType::BinaryView)) +} + /// Reconstruct `batch` to organize the payload buffers of each `StringViewArray` and /// `BinaryViewArray` in sequential order by calling `gc()` on them. /// @@ -246,9 +257,55 @@ mod tests { use arrow_array::{ BinaryViewArray, BooleanArray, ListArray, StringArray, StringViewArray, StructArray, }; - use arrow_schema::{DataType, Field, Schema}; + use arrow_schema::{DataType, Field, Fields, Schema}; use std::sync::Arc; + fn make_schema(fields: Vec) -> SchemaRef { + Arc::new(Schema::new(fields)) + } + + #[test] + fn test_schema_contains_view_types_top_level() { + let schema_ref = make_schema(vec![ + Field::new("a", DataType::Utf8View, true), + Field::new("b", DataType::BinaryView, true), + ]); + + assert!(schema_contains_view_types(&schema_ref)); + + // Similar shape but without view types + let schema_no_view = make_schema(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Binary, true), + ]); + assert!(!schema_contains_view_types(&schema_no_view)); + } + + #[test] + fn test_schema_contains_view_types_nested() { + let nested = Field::new( + "s", + DataType::Struct(Fields::from(vec![Field::new( + "v", + DataType::Utf8View, + true, + )])), + true, + ); + + let schema_ref = make_schema(vec![nested]); + assert!(schema_contains_view_types(&schema_ref)); + + // Nested struct without any view types + let nested_no_view = Field::new( + "s", + DataType::Struct(Fields::from(vec![Field::new("v", DataType::Utf8, true)])), + true, + ); + let schema_no_view = make_schema(vec![nested_no_view]); + assert!(!schema_contains_view_types(&schema_no_view)); + } + #[test] fn test_string_view_array_memory_size() { let array = StringViewArray::from(vec![ @@ -523,12 +580,12 @@ mod tests { let array: ArrayRef = Arc::new(struct_array); let slice = array.slice(0, 2); - let before_size = get_array_memory_size(&array).unwrap(); + let before_size = slice.get_array_memory_size(); let (compacted, mutated) = compact_array(Arc::new(slice)).unwrap(); assert!(mutated); - let after_size = get_array_memory_size(&compacted).unwrap(); + let after_size = compacted.get_array_memory_size(); assert!(after_size < before_size); } @@ -548,12 +605,12 @@ mod tests { } let bv_list: ListArray = bv_list_builder.finish(); let sliced: ArrayRef = Arc::new(bv_list.slice(0, 1)); - let before_size = get_array_memory_size(&sliced).unwrap(); + let before_size = sliced.get_array_memory_size(); let (compacted, mutated) = compact_array(Arc::clone(&sliced)).unwrap(); assert!(mutated); - let after_size = get_array_memory_size(&compacted).unwrap(); + let after_size = compacted.get_array_memory_size(); assert!(after_size <= before_size); } diff --git a/rust/sedona-spatial-join/src/utils/spill.rs b/rust/sedona-spatial-join/src/utils/spill.rs new file mode 100644 index 000000000..fef585a5d --- /dev/null +++ b/rust/sedona-spatial-join/src/utils/spill.rs @@ -0,0 +1,382 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{fs::File, io::BufReader, sync::Arc}; + +use arrow::ipc::{ + reader::StreamReader, + writer::{IpcWriteOptions, StreamWriter}, +}; +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; +use datafusion::config::SpillCompression; +use datafusion_common::{DataFusionError, Result}; +use datafusion_execution::{disk_manager::RefCountedTempFile, runtime_env::RuntimeEnv}; +use datafusion_physical_plan::metrics::SpillMetrics; + +use crate::utils::arrow_utils::{ + compact_batch, get_record_batch_memory_size, schema_contains_view_types, +}; + +/// Generic Arrow IPC stream spill writer for [`RecordBatch`]. +/// +/// Shared between multiple components so spill metrics are updated consistently. +pub(crate) struct RecordBatchSpillWriter { + in_progress_file: RefCountedTempFile, + writer: StreamWriter, + metrics: SpillMetrics, + batch_size_threshold: Option, + gc_view_arrays: bool, +} + +impl RecordBatchSpillWriter { + pub fn try_new( + env: Arc, + schema: SchemaRef, + request_description: &str, + compression: SpillCompression, + metrics: SpillMetrics, + batch_size_threshold: Option, + ) -> Result { + let in_progress_file = env.disk_manager.create_tmp_file(request_description)?; + let file = File::create(in_progress_file.path())?; + + let mut write_options = IpcWriteOptions::default(); + write_options = write_options.try_with_compression(compression.into())?; + + let writer = StreamWriter::try_new_with_options(file, schema.as_ref(), write_options)?; + metrics.spill_file_count.add(1); + + let gc_view_arrays = schema_contains_view_types(&schema); + + Ok(Self { + in_progress_file, + writer, + metrics, + batch_size_threshold, + gc_view_arrays, + }) + } + + /// Write a record batch to the spill file. + /// + /// If `batch_size_threshold` is configured and the in-memory size of the batch exceeds the + /// threshold, this will automatically split the batch into smaller slices and (optionally) + /// compact each slice before writing. + pub fn write_batch(&mut self, batch: RecordBatch) -> Result<()> { + let num_rows = batch.num_rows(); + if num_rows == 0 { + // Preserve "empty batch" semantics: callers may rely on spilling and reading back a + // zero-row batch (e.g. as a sentinel for an empty stream). + return self.write_one_batch(batch); + } + + let rows_per_split = self.calculate_rows_per_split(&batch, num_rows)?; + if rows_per_split < num_rows { + let mut offset = 0; + while offset < num_rows { + let length = std::cmp::min(rows_per_split, num_rows - offset); + let slice = batch.slice(offset, length); + self.write_one_batch(slice)?; + offset += length; + } + } else { + self.write_one_batch(batch)?; + } + Ok(()) + } + + fn calculate_rows_per_split(&self, batch: &RecordBatch, num_rows: usize) -> Result { + let Some(threshold) = self.batch_size_threshold else { + return Ok(num_rows); + }; + if threshold == 0 { + return Ok(num_rows); + } + + let batch_size = get_record_batch_memory_size(batch)?; + if batch_size <= threshold { + return Ok(num_rows); + } + + let num_splits = batch_size.div_ceil(threshold); + let rows = num_rows.div_ceil(num_splits); + Ok(std::cmp::max(1, rows)) + } + + fn write_one_batch(&mut self, batch: RecordBatch) -> Result<()> { + // Writing record batches containing sparse binary view arrays may lead to excessive + // disk usage and slow read performance later. Compact such batches before writing. + let batch = if self.gc_view_arrays { + compact_batch(batch)? + } else { + batch + }; + self.writer.write(&batch).map_err(|e| { + DataFusionError::Execution(format!( + "Failed to write RecordBatch to spill file {:?}: {}", + self.in_progress_file.path(), + e + )) + })?; + + self.metrics.spilled_rows.add(batch.num_rows()); + Ok(()) + } + + pub fn finish(mut self) -> Result { + self.writer.finish()?; + + let mut in_progress_file = self.in_progress_file; + in_progress_file.update_disk_usage()?; + let size = in_progress_file.current_disk_usage(); + self.metrics.spilled_bytes.add(size as usize); + Ok(in_progress_file) + } +} + +/// Generic Arrow IPC stream spill reader for [`RecordBatch`]. +pub(crate) struct RecordBatchSpillReader { + stream_reader: StreamReader>, +} + +impl RecordBatchSpillReader { + pub fn try_new(temp_file: &RefCountedTempFile) -> Result { + let file = File::open(temp_file.path())?; + let mut stream_reader = StreamReader::try_new_buffered(file, None)?; + + // SAFETY: spill writers in this crate strictly follow Arrow IPC specifications. + // Skip redundant validation during read to speed up. + unsafe { + stream_reader = stream_reader.with_skip_validation(true); + } + + Ok(Self { stream_reader }) + } + + pub fn schema(&self) -> SchemaRef { + self.stream_reader.schema() + } + + pub fn next_batch(&mut self) -> Option> { + self.stream_reader + .next() + .map(|result| result.map_err(|e| e.into())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::builder::BinaryViewBuilder; + use arrow_array::{Int32Array, StringArray}; + use arrow_schema::{DataType, Field, Schema}; + use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; + + fn create_test_runtime_env() -> Result> { + Ok(Arc::new(RuntimeEnv::default())) + } + + fn create_test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("name", DataType::Utf8, true), + ])) + } + + fn create_test_record_batch(num_rows: usize) -> RecordBatch { + let ids: Int32Array = (0..num_rows as i32).collect(); + + let names: StringArray = (0..num_rows) + .map(|i| { + if i % 3 == 0 { + None + } else { + Some(format!("name_{i}")) + } + }) + .collect(); + + RecordBatch::try_new(create_test_schema(), vec![Arc::new(ids), Arc::new(names)]).unwrap() + } + + fn create_test_binary_view_batch(num_rows: usize, value_len: usize) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![Field::new( + "payload", + DataType::BinaryView, + false, + )])); + + let mut builder = BinaryViewBuilder::new(); + for i in 0..num_rows { + let byte = b'a' + (i % 26) as u8; + let bytes = vec![byte; value_len]; + builder.append_value(bytes.as_slice()); + } + + let array = Arc::new(builder.finish()); + RecordBatch::try_new(schema, vec![array]).unwrap() + } + + #[test] + fn test_record_batch_spill_empty_batch_round_trip() -> Result<()> { + let env = create_test_runtime_env()?; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + let schema = create_test_schema(); + let mut writer = RecordBatchSpillWriter::try_new( + env, + schema.clone(), + "test_record_batch_spill_empty", + SpillCompression::Uncompressed, + metrics.clone(), + None, + )?; + + let empty = create_test_record_batch(0); + writer.write_batch(empty)?; + let file = writer.finish()?; + + assert_eq!(metrics.spill_file_count.value(), 1); + assert_eq!(metrics.spilled_rows.value(), 0); + + let mut reader = RecordBatchSpillReader::try_new(&file)?; + let read = reader.next_batch().unwrap()?; + assert_eq!(read.num_rows(), 0); + assert_eq!(read.schema(), schema); + assert!(reader.next_batch().is_none()); + + Ok(()) + } + + #[test] + fn test_record_batch_spill_round_trip() -> Result<()> { + let env = create_test_runtime_env()?; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + let schema = create_test_schema(); + let mut writer = RecordBatchSpillWriter::try_new( + env, + schema.clone(), + "test_record_batch_spill", + SpillCompression::Uncompressed, + metrics.clone(), + None, + )?; + + let batch1 = create_test_record_batch(5); + let batch2 = create_test_record_batch(3); + writer.write_batch(batch1)?; + writer.write_batch(batch2)?; + + let file = writer.finish()?; + + assert_eq!(metrics.spill_file_count.value(), 1); + assert_eq!(metrics.spilled_rows.value(), 8); + assert!(metrics.spilled_bytes.value() > 0); + + let mut reader = RecordBatchSpillReader::try_new(&file)?; + assert_eq!(reader.schema(), schema); + + let read1 = reader.next_batch().unwrap()?; + assert_eq!(read1.num_rows(), 5); + let read2 = reader.next_batch().unwrap()?; + assert_eq!(read2.num_rows(), 3); + assert!(reader.next_batch().is_none()); + + Ok(()) + } + + #[test] + fn test_record_batch_spill_auto_splitting() -> Result<()> { + let env = create_test_runtime_env()?; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + let schema = create_test_schema(); + // Force splitting by setting a tiny threshold. + let mut writer = RecordBatchSpillWriter::try_new( + env, + schema.clone(), + "test_record_batch_spill_split", + SpillCompression::Uncompressed, + metrics.clone(), + Some(1), + )?; + + let batch = create_test_record_batch(10); + writer.write_batch(batch)?; + let file = writer.finish()?; + + // Rows should reflect the logical input rows, even if internally split. + assert_eq!(metrics.spilled_rows.value(), 10); + assert!(metrics.spilled_bytes.value() > 0); + + // Reader should be able to read all rows back across multiple batches. + let mut reader = RecordBatchSpillReader::try_new(&file)?; + let mut total_rows = 0; + while let Some(batch) = reader.next_batch() { + total_rows += batch?.num_rows(); + } + assert_eq!(total_rows, 10); + + Ok(()) + } + + #[test] + fn test_record_batch_spill_sliced_binary_view_not_excessive() -> Result<()> { + let env = create_test_runtime_env()?; + let metrics_set = ExecutionPlanMetricsSet::new(); + let metrics = SpillMetrics::new(&metrics_set, 0); + + // Use a long payload so the view-value buffers dominate overhead, making the + // size comparison stable across platforms. + const NUM_ROWS: usize = 100; + const NUM_SLICES: usize = 10; + const VALUE_LEN: usize = 8 * 1024; + + let batch = create_test_binary_view_batch(NUM_ROWS, VALUE_LEN); + let batch_size = get_record_batch_memory_size(&batch)?; + + let mut writer = RecordBatchSpillWriter::try_new( + env, + batch.schema(), + "test_record_batch_spill_sliced_binary_view", + SpillCompression::Uncompressed, + metrics.clone(), + None, + )?; + + let rows_per_slice = NUM_ROWS / NUM_SLICES; + assert_eq!(rows_per_slice * NUM_SLICES, NUM_ROWS); + for i in 0..NUM_SLICES { + let slice = batch.slice(i * rows_per_slice, rows_per_slice); + writer.write_batch(slice)?; + } + + let file = writer.finish()?; + let spill_size = file.current_disk_usage() as usize; + assert!( + spill_size <= (batch_size as f64 * 1.2) as usize, + "spill file unexpectedly large for sliced BinaryView batch: spill_size={spill_size}, batch_size={batch_size}" + ); + + Ok(()) + } +}