Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 3 additions & 18 deletions rust/sedona-spatial-join/src/build_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,9 @@ pub async fn build_index(
collect_metrics_vec.push(CollectBuildSideMetrics::new(k, &metrics));
}

let build_partitions = if concurrent {
// Collect partitions concurrently using collect_all which spawns tasks
collector
.collect_all(build_streams, reservations, collect_metrics_vec)
.await?
} else {
// Collect partitions sequentially (for JNI/embedded contexts)
let mut partitions = Vec::with_capacity(num_partitions);
for ((stream, reservation), metrics) in build_streams
.into_iter()
.zip(reservations)
.zip(&collect_metrics_vec)
{
let partition = collector.collect(stream, reservation, metrics).await?;
partitions.push(partition);
}
partitions
};
let build_partitions = collector
.collect_all(build_streams, reservations, collect_metrics_vec, concurrent)
.await?;

let contains_external_stream = build_partitions
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use std::pin::Pin;

use arrow_schema::SchemaRef;
use futures::Stream;

use crate::evaluated_batch::EvaluatedBatch;
Expand All @@ -27,8 +28,15 @@ use datafusion_common::Result;
pub(crate) trait EvaluatedBatchStream: Stream<Item = Result<EvaluatedBatch>> {
/// Returns true if this stream is an external stream, where batch data were spilled to disk.
fn is_external(&self) -> bool;

/// Returns the schema of records produced by this `EvaluatedBatchStream`.
///
/// Implementation of this trait should guarantee that all `EvaluatedBatch`'s returned by this
/// stream should have the same schema as returned from this method.
fn schema(&self) -> SchemaRef;
}

pub(crate) type SendableEvaluatedBatchStream = Pin<Box<dyn EvaluatedBatchStream + Send>>;

pub(crate) mod evaluate;
pub(crate) mod in_mem;
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use arrow_array::RecordBatch;
use arrow_schema::{DataType, SchemaRef};
use datafusion_common::Result;
use datafusion_physical_plan::{metrics, SendableRecordBatchStream};
use futures::{Stream, StreamExt};

use crate::evaluated_batch::{
evaluated_batch_stream::{EvaluatedBatchStream, SendableEvaluatedBatchStream},
EvaluatedBatch,
};
use crate::operand_evaluator::{EvaluatedGeometryArray, OperandEvaluator};
use crate::utils::arrow_utils::compact_batch;

/// An evaluator that can evaluate geometry expressions on record batches
/// and produces evaluated geometry arrays.
trait Evaluator: Unpin {
fn evaluate(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray>;
}

/// An evaluator for build-side geometry expressions.
struct BuildSideEvaluator {
evaluator: Arc<dyn OperandEvaluator>,
}

impl Evaluator for BuildSideEvaluator {
fn evaluate(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
self.evaluator.evaluate_build(batch)
}
}

/// An evaluator for probe-side geometry expressions.
struct ProbeSideEvaluator {
evaluator: Arc<dyn OperandEvaluator>,
}

impl Evaluator for ProbeSideEvaluator {
fn evaluate(&self, batch: &RecordBatch) -> Result<EvaluatedGeometryArray> {
self.evaluator.evaluate_probe(batch)
}
}

/// Wraps a `SendableRecordBatchStream` and evaluates the probe-side geometry
/// expression eagerly so downstream consumers can operate on `EvaluatedBatch`s.
struct EvaluateOperandBatchStream<E: Evaluator> {
inner: SendableRecordBatchStream,
evaluator: E,
evaluation_time: metrics::Time,
gc_view_arrays: bool,
}

impl<E: Evaluator> EvaluateOperandBatchStream<E> {
fn new(
inner: SendableRecordBatchStream,
evaluator: E,
evaluation_time: metrics::Time,
gc_view_arrays: bool,
) -> Self {
let gc_view_arrays = gc_view_arrays && schema_contains_view_types(&inner.schema());
Self {
inner,
evaluator,
evaluation_time,
gc_view_arrays,
}
}
}

/// Checks if the schema contains any view types (Utf8View or BinaryView).
fn schema_contains_view_types(schema: &SchemaRef) -> bool {
schema
.flattened_fields()
.iter()
.any(|field| matches!(field.data_type(), DataType::Utf8View | DataType::BinaryView))
}

impl<E: Evaluator> EvaluatedBatchStream for EvaluateOperandBatchStream<E> {
fn is_external(&self) -> bool {
false
}

fn schema(&self) -> arrow_schema::SchemaRef {
self.inner.schema()
}
}

impl<E: Evaluator> Stream for EvaluateOperandBatchStream<E> {
type Item = Result<EvaluatedBatch>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let self_mut = self.get_mut();
match self_mut.inner.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
let _timer = self_mut.evaluation_time.timer();
let batch = if self_mut.gc_view_arrays {
compact_batch(batch)?
} else {
batch
};
let geom_array = self_mut.evaluator.evaluate(&batch)?;
let evaluated = EvaluatedBatch { batch, geom_array };
Poll::Ready(Some(Ok(evaluated)))
}
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}

/// Returns a `SendableEvaluatedBatchStream` that eagerly evaluates the build-side
/// geometry expression for every incoming `RecordBatch`.
pub(crate) fn create_evaluated_build_stream(
stream: SendableRecordBatchStream,
evaluator: Arc<dyn OperandEvaluator>,
evaluation_time: metrics::Time,
) -> SendableEvaluatedBatchStream {
// Enable gc_view_arrays for build-side since build-side batches needs to be long-lived
// in memory during the join process. Poorly managed sparse view arrays could lead to
// unnecessary high memory usage or excessive spilling.
Box::pin(EvaluateOperandBatchStream::new(
stream,
BuildSideEvaluator { evaluator },
evaluation_time,
true,
))
}

/// Returns a `SendableEvaluatedBatchStream` that eagerly evaluates the probe-side
/// geometry expression for every incoming `RecordBatch`.
pub(crate) fn create_evaluated_probe_stream(
stream: SendableRecordBatchStream,
evaluator: Arc<dyn OperandEvaluator>,
evaluation_time: metrics::Time,
) -> SendableEvaluatedBatchStream {
Box::pin(EvaluateOperandBatchStream::new(
stream,
ProbeSideEvaluator { evaluator },
evaluation_time,
false,
))
}

#[cfg(test)]
mod tests {
use std::sync::Arc;

use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef};

use super::schema_contains_view_types;

fn schema(fields: Vec<Field>) -> SchemaRef {
Arc::new(Schema::new(fields))
}

#[test]
fn test_schema_contains_view_types_top_level() {
let schema_ref = schema(vec![
Field::new("a", DataType::Utf8View, true),
Field::new("b", DataType::BinaryView, true),
]);

assert!(schema_contains_view_types(&schema_ref));

// Similar shape but without view types
let schema_no_view = schema(vec![
Field::new("a", DataType::Utf8, true),
Field::new("b", DataType::Binary, true),
]);
assert!(!schema_contains_view_types(&schema_no_view));
}

#[test]
fn test_schema_contains_view_types_nested() {
let nested = Field::new(
"s",
DataType::Struct(Fields::from(vec![Field::new(
"v",
DataType::Utf8View,
true,
)])),
true,
);

let schema_ref = schema(vec![nested]);
assert!(schema_contains_view_types(&schema_ref));

// Nested struct without any view types
let nested_no_view = Field::new(
"s",
DataType::Struct(Fields::from(vec![Field::new("v", DataType::Utf8, true)])),
true,
);
let schema_no_view = schema(vec![nested_no_view]);
assert!(!schema_contains_view_types(&schema_no_view));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,25 @@

use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
vec::IntoIter,
};

use arrow_schema::SchemaRef;
use datafusion_common::Result;

use crate::evaluated_batch::{evaluated_batch_stream::EvaluatedBatchStream, EvaluatedBatch};

pub(crate) struct InMemoryEvaluatedBatchStream {
schema: SchemaRef,
iter: IntoIter<EvaluatedBatch>,
}

impl InMemoryEvaluatedBatchStream {
pub fn new(batches: Vec<EvaluatedBatch>) -> Self {
pub fn new(schema: SchemaRef, batches: Vec<EvaluatedBatch>) -> Self {
InMemoryEvaluatedBatchStream {
schema,
iter: batches.into_iter(),
}
}
Expand All @@ -41,6 +45,10 @@ impl EvaluatedBatchStream for InMemoryEvaluatedBatchStream {
fn is_external(&self) -> bool {
false
}

fn schema(&self) -> arrow_schema::SchemaRef {
Arc::clone(&self.schema)
}
}

impl futures::Stream for InMemoryEvaluatedBatchStream {
Expand Down
Loading