diff --git a/rust/sedona-spatial-join/src/build_index.rs b/rust/sedona-spatial-join/src/build_index.rs index 3600d2940..f369365c5 100644 --- a/rust/sedona-spatial-join/src/build_index.rs +++ b/rust/sedona-spatial-join/src/build_index.rs @@ -71,24 +71,9 @@ pub async fn build_index( collect_metrics_vec.push(CollectBuildSideMetrics::new(k, &metrics)); } - let build_partitions = if concurrent { - // Collect partitions concurrently using collect_all which spawns tasks - collector - .collect_all(build_streams, reservations, collect_metrics_vec) - .await? - } else { - // Collect partitions sequentially (for JNI/embedded contexts) - let mut partitions = Vec::with_capacity(num_partitions); - for ((stream, reservation), metrics) in build_streams - .into_iter() - .zip(reservations) - .zip(&collect_metrics_vec) - { - let partition = collector.collect(stream, reservation, metrics).await?; - partitions.push(partition); - } - partitions - }; + let build_partitions = collector + .collect_all(build_streams, reservations, collect_metrics_vec, concurrent) + .await?; let contains_external_stream = build_partitions .iter() diff --git a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs index 958087f7b..eb1f855ca 100644 --- a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs +++ b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs @@ -17,6 +17,7 @@ use std::pin::Pin; +use arrow_schema::SchemaRef; use futures::Stream; use crate::evaluated_batch::EvaluatedBatch; @@ -27,8 +28,15 @@ use datafusion_common::Result; pub(crate) trait EvaluatedBatchStream: Stream> { /// Returns true if this stream is an external stream, where batch data were spilled to disk. fn is_external(&self) -> bool; + + /// Returns the schema of records produced by this `EvaluatedBatchStream`. + /// + /// Implementation of this trait should guarantee that all `EvaluatedBatch`'s returned by this + /// stream should have the same schema as returned from this method. + fn schema(&self) -> SchemaRef; } pub(crate) type SendableEvaluatedBatchStream = Pin>; +pub(crate) mod evaluate; pub(crate) mod in_mem; diff --git a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs new file mode 100644 index 000000000..0baf3c5f5 --- /dev/null +++ b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/evaluate.rs @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use arrow_array::RecordBatch; +use arrow_schema::{DataType, SchemaRef}; +use datafusion_common::Result; +use datafusion_physical_plan::{metrics, SendableRecordBatchStream}; +use futures::{Stream, StreamExt}; + +use crate::evaluated_batch::{ + evaluated_batch_stream::{EvaluatedBatchStream, SendableEvaluatedBatchStream}, + EvaluatedBatch, +}; +use crate::operand_evaluator::{EvaluatedGeometryArray, OperandEvaluator}; +use crate::utils::arrow_utils::compact_batch; + +/// An evaluator that can evaluate geometry expressions on record batches +/// and produces evaluated geometry arrays. +trait Evaluator: Unpin { + fn evaluate(&self, batch: &RecordBatch) -> Result; +} + +/// An evaluator for build-side geometry expressions. +struct BuildSideEvaluator { + evaluator: Arc, +} + +impl Evaluator for BuildSideEvaluator { + fn evaluate(&self, batch: &RecordBatch) -> Result { + self.evaluator.evaluate_build(batch) + } +} + +/// An evaluator for probe-side geometry expressions. +struct ProbeSideEvaluator { + evaluator: Arc, +} + +impl Evaluator for ProbeSideEvaluator { + fn evaluate(&self, batch: &RecordBatch) -> Result { + self.evaluator.evaluate_probe(batch) + } +} + +/// Wraps a `SendableRecordBatchStream` and evaluates the probe-side geometry +/// expression eagerly so downstream consumers can operate on `EvaluatedBatch`s. +struct EvaluateOperandBatchStream { + inner: SendableRecordBatchStream, + evaluator: E, + evaluation_time: metrics::Time, + gc_view_arrays: bool, +} + +impl EvaluateOperandBatchStream { + fn new( + inner: SendableRecordBatchStream, + evaluator: E, + evaluation_time: metrics::Time, + gc_view_arrays: bool, + ) -> Self { + let gc_view_arrays = gc_view_arrays && schema_contains_view_types(&inner.schema()); + Self { + inner, + evaluator, + evaluation_time, + gc_view_arrays, + } + } +} + +/// Checks if the schema contains any view types (Utf8View or BinaryView). +fn schema_contains_view_types(schema: &SchemaRef) -> bool { + schema + .flattened_fields() + .iter() + .any(|field| matches!(field.data_type(), DataType::Utf8View | DataType::BinaryView)) +} + +impl EvaluatedBatchStream for EvaluateOperandBatchStream { + fn is_external(&self) -> bool { + false + } + + fn schema(&self) -> arrow_schema::SchemaRef { + self.inner.schema() + } +} + +impl Stream for EvaluateOperandBatchStream { + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let self_mut = self.get_mut(); + match self_mut.inner.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(batch))) => { + let _timer = self_mut.evaluation_time.timer(); + let batch = if self_mut.gc_view_arrays { + compact_batch(batch)? + } else { + batch + }; + let geom_array = self_mut.evaluator.evaluate(&batch)?; + let evaluated = EvaluatedBatch { batch, geom_array }; + Poll::Ready(Some(Ok(evaluated))) + } + Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +/// Returns a `SendableEvaluatedBatchStream` that eagerly evaluates the build-side +/// geometry expression for every incoming `RecordBatch`. +pub(crate) fn create_evaluated_build_stream( + stream: SendableRecordBatchStream, + evaluator: Arc, + evaluation_time: metrics::Time, +) -> SendableEvaluatedBatchStream { + // Enable gc_view_arrays for build-side since build-side batches needs to be long-lived + // in memory during the join process. Poorly managed sparse view arrays could lead to + // unnecessary high memory usage or excessive spilling. + Box::pin(EvaluateOperandBatchStream::new( + stream, + BuildSideEvaluator { evaluator }, + evaluation_time, + true, + )) +} + +/// Returns a `SendableEvaluatedBatchStream` that eagerly evaluates the probe-side +/// geometry expression for every incoming `RecordBatch`. +pub(crate) fn create_evaluated_probe_stream( + stream: SendableRecordBatchStream, + evaluator: Arc, + evaluation_time: metrics::Time, +) -> SendableEvaluatedBatchStream { + Box::pin(EvaluateOperandBatchStream::new( + stream, + ProbeSideEvaluator { evaluator }, + evaluation_time, + false, + )) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef}; + + use super::schema_contains_view_types; + + fn schema(fields: Vec) -> SchemaRef { + Arc::new(Schema::new(fields)) + } + + #[test] + fn test_schema_contains_view_types_top_level() { + let schema_ref = schema(vec![ + Field::new("a", DataType::Utf8View, true), + Field::new("b", DataType::BinaryView, true), + ]); + + assert!(schema_contains_view_types(&schema_ref)); + + // Similar shape but without view types + let schema_no_view = schema(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Binary, true), + ]); + assert!(!schema_contains_view_types(&schema_no_view)); + } + + #[test] + fn test_schema_contains_view_types_nested() { + let nested = Field::new( + "s", + DataType::Struct(Fields::from(vec![Field::new( + "v", + DataType::Utf8View, + true, + )])), + true, + ); + + let schema_ref = schema(vec![nested]); + assert!(schema_contains_view_types(&schema_ref)); + + // Nested struct without any view types + let nested_no_view = Field::new( + "s", + DataType::Struct(Fields::from(vec![Field::new("v", DataType::Utf8, true)])), + true, + ); + let schema_no_view = schema(vec![nested_no_view]); + assert!(!schema_contains_view_types(&schema_no_view)); + } +} diff --git a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs index 57671547b..550e308a2 100644 --- a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs +++ b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs @@ -17,21 +17,25 @@ use std::{ pin::Pin, + sync::Arc, task::{Context, Poll}, vec::IntoIter, }; +use arrow_schema::SchemaRef; use datafusion_common::Result; use crate::evaluated_batch::{evaluated_batch_stream::EvaluatedBatchStream, EvaluatedBatch}; pub(crate) struct InMemoryEvaluatedBatchStream { + schema: SchemaRef, iter: IntoIter, } impl InMemoryEvaluatedBatchStream { - pub fn new(batches: Vec) -> Self { + pub fn new(schema: SchemaRef, batches: Vec) -> Self { InMemoryEvaluatedBatchStream { + schema, iter: batches.into_iter(), } } @@ -41,6 +45,10 @@ impl EvaluatedBatchStream for InMemoryEvaluatedBatchStream { fn is_external(&self) -> bool { false } + + fn schema(&self) -> arrow_schema::SchemaRef { + Arc::clone(&self.schema) + } } impl futures::Stream for InMemoryEvaluatedBatchStream { diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs b/rust/sedona-spatial-join/src/index/build_side_collector.rs index ab3e8e27f..a1643834b 100644 --- a/rust/sedona-spatial-join/src/index/build_side_collector.rs +++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs @@ -29,7 +29,8 @@ use sedona_schema::datatypes::WKB_GEOMETRY; use crate::{ evaluated_batch::{ evaluated_batch_stream::{ - in_mem::InMemoryEvaluatedBatchStream, SendableEvaluatedBatchStream, + evaluate::create_evaluated_build_stream, in_mem::InMemoryEvaluatedBatchStream, + SendableEvaluatedBatchStream, }, EvaluatedBatch, }, @@ -88,30 +89,23 @@ impl BuildSideBatchesCollector { pub async fn collect( &self, - mut stream: SendableRecordBatchStream, + mut stream: SendableEvaluatedBatchStream, mut reservation: MemoryReservation, metrics: &CollectBuildSideMetrics, ) -> Result { - let evaluator = self.evaluator.as_ref(); let mut in_mem_batches: Vec = Vec::new(); let mut analyzer = AnalyzeAccumulator::new(WKB_GEOMETRY, WKB_GEOMETRY); - while let Some(record_batch) = stream.next().await { - let record_batch = record_batch?; + while let Some(evaluated_batch) = stream.next().await { + let build_side_batch = evaluated_batch?; let _timer = metrics.time_taken.timer(); // Process the record batch and create a BuildSideBatch - let geom_array = evaluator.evaluate_build(&record_batch)?; - + let geom_array = &build_side_batch.geom_array; for wkb in geom_array.wkbs().iter().flatten() { analyzer.update_statistics(wkb, wkb.buf().len())?; } - let build_side_batch = EvaluatedBatch { - batch: record_batch, - geom_array, - }; - let in_mem_size = build_side_batch.in_mem_size()?; metrics.num_batches.add(1); metrics.num_rows.add(build_side_batch.num_rows()); @@ -122,7 +116,10 @@ impl BuildSideBatchesCollector { } Ok(BuildPartition { - build_side_batch_stream: Box::pin(InMemoryEvaluatedBatchStream::new(in_mem_batches)), + build_side_batch_stream: Box::pin(InMemoryEvaluatedBatchStream::new( + stream.schema(), + in_mem_batches, + )), geo_statistics: analyzer.finish(), reservation, }) @@ -133,12 +130,28 @@ impl BuildSideBatchesCollector { streams: Vec, reservations: Vec, metrics_vec: Vec, + concurrent: bool, ) -> Result> { if streams.is_empty() { return Ok(vec![]); } - // Spawn all tasks to scan all build streams concurrently + if concurrent { + self.collect_all_concurrently(streams, reservations, metrics_vec) + .await + } else { + self.collect_all_sequential(streams, reservations, metrics_vec) + .await + } + } + + async fn collect_all_concurrently( + &self, + streams: Vec, + reservations: Vec, + metrics_vec: Vec, + ) -> Result> { + // Spawn a task for each stream to scan all streams concurrently let mut join_set = JoinSet::new(); for (partition_id, ((stream, metrics), reservation)) in streams .into_iter() @@ -147,8 +160,13 @@ impl BuildSideBatchesCollector { .enumerate() { let collector = self.clone(); + let evaluator = Arc::clone(&self.evaluator); join_set.spawn(async move { - let result = collector.collect(stream, reservation, &metrics).await; + let evaluated_stream = + create_evaluated_build_stream(stream, evaluator, metrics.time_taken.clone()); + let result = collector + .collect(evaluated_stream, reservation, &metrics) + .await; (partition_id, result) }); } @@ -168,4 +186,26 @@ impl BuildSideBatchesCollector { Ok(partitions.into_iter().map(|v| v.unwrap()).collect()) } + + async fn collect_all_sequential( + &self, + streams: Vec, + reservations: Vec, + metrics_vec: Vec, + ) -> Result> { + // Collect partitions sequentially (for JNI/embedded contexts) + let mut results = Vec::with_capacity(streams.len()); + for ((stream, metrics), reservation) in + streams.into_iter().zip(metrics_vec).zip(reservations) + { + let evaluator = Arc::clone(&self.evaluator); + let evaluated_stream = + create_evaluated_build_stream(stream, evaluator, metrics.time_taken.clone()); + let result = self + .collect(evaluated_stream, reservation, &metrics) + .await?; + results.push(result); + } + Ok(results) + } } diff --git a/rust/sedona-spatial-join/src/stream.rs b/rust/sedona-spatial-join/src/stream.rs index 37a84523d..f4b182445 100644 --- a/rust/sedona-spatial-join/src/stream.rs +++ b/rust/sedona-spatial-join/src/stream.rs @@ -33,9 +33,11 @@ use std::collections::HashMap; use std::ops::Range; use std::sync::Arc; +use crate::evaluated_batch::evaluated_batch_stream::evaluate::create_evaluated_probe_stream; +use crate::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream; use crate::evaluated_batch::EvaluatedBatch; use crate::index::SpatialIndex; -use crate::operand_evaluator::{create_operand_evaluator, distance_value_at, OperandEvaluator}; +use crate::operand_evaluator::{create_operand_evaluator, distance_value_at}; use crate::spatial_predicate::SpatialPredicate; use crate::utils::join_utils::{ adjust_indices_by_join_type, apply_join_filter_to_indices, build_batch_from_indices, @@ -55,7 +57,7 @@ pub(crate) struct SpatialJoinStream { /// type of the join join_type: JoinType, /// The stream of the probe side - probe_stream: SendableRecordBatchStream, + probe_stream: SendableEvaluatedBatchStream, /// Information of index and left / right placement of columns column_indices: Vec, /// Maintains the order of the probe side @@ -76,8 +78,6 @@ pub(crate) struct SpatialJoinStream { once_async_spatial_index: Arc>>>, /// The spatial index spatial_index: Option>, - /// The `on` spatial predicate evaluator - evaluator: Arc, /// The spatial predicate being evaluated spatial_predicate: SpatialPredicate, } @@ -99,6 +99,11 @@ impl SpatialJoinStream { once_async_spatial_index: Arc>>>, ) -> Self { let evaluator = create_operand_evaluator(on, options.clone()); + let probe_stream = create_evaluated_probe_stream( + probe_stream, + Arc::clone(&evaluator), + join_metrics.join_time.clone(), + ); Self { schema, filter, @@ -113,7 +118,6 @@ impl SpatialJoinStream { once_fut_spatial_index, once_async_spatial_index, spatial_index: None, - evaluator, spatial_predicate: on.clone(), } } @@ -386,9 +390,9 @@ impl SpatialJoinStream { fn create_spatial_join_iterator( &self, - probe_batch: RecordBatch, + probe_evaluated_batch: EvaluatedBatch, ) -> Result { - let num_rows = probe_batch.num_rows(); + let num_rows = probe_evaluated_batch.num_rows(); self.join_metrics.probe_input_batches.add(1); self.join_metrics.probe_input_rows.add(num_rows); @@ -398,13 +402,11 @@ impl SpatialJoinStream { .as_ref() .expect("Spatial index should be available"); - // Evaluate the probe side geometry expression to get geometry array - let geom_array = self.evaluator.evaluate_probe(&probe_batch)?; - // Update the probe side statistics, which may help the spatial index to select a better // execution mode for evaluating the spatial predicate. if spatial_index.need_more_probe_stats() { let mut analyzer = AnalyzeAccumulator::new(WKB_GEOMETRY, WKB_GEOMETRY); + let geom_array = &probe_evaluated_batch.geom_array; for wkb in geom_array.wkbs().iter().flatten() { analyzer.update_statistics(wkb, wkb.buf().len())?; } @@ -414,10 +416,7 @@ impl SpatialJoinStream { SpatialJoinBatchIterator::new(SpatialJoinBatchIteratorParams { spatial_index: spatial_index.clone(), - probe_evaluated_batch: EvaluatedBatch { - batch: probe_batch, - geom_array, - }, + probe_evaluated_batch, join_metrics: self.join_metrics.clone(), max_batch_size: self.target_output_batch_size, probe_side_ordered: self.probe_side_ordered, diff --git a/rust/sedona-spatial-join/src/utils/arrow_utils.rs b/rust/sedona-spatial-join/src/utils/arrow_utils.rs index c8c5779b6..367568fc2 100644 --- a/rust/sedona-spatial-join/src/utils/arrow_utils.rs +++ b/rust/sedona-spatial-join/src/utils/arrow_utils.rs @@ -15,10 +15,149 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{Array, ArrayData, RecordBatch}; +use std::sync::Arc; + +use arrow::array::{Array, ArrayData, BinaryViewArray, ListArray, RecordBatch, StringViewArray}; +use arrow_array::make_array; use arrow_array::ArrayRef; +use arrow_array::StructArray; use arrow_schema::{ArrowError, DataType}; use datafusion_common::Result; +use sedona_common::sedona_internal_err; + +/// Reconstruct `batch` to organize the payload buffers of each `StringViewArray` and +/// `BinaryViewArray` in sequential order by calling `gc()` on them. +/// +/// Note this is a workaround until is +/// available. +/// +/// # Rationale +/// +/// The `interleave` kernel does not reconstruct the inner buffers of view arrays by default, +/// leading to non-sequential payload locations. A single payload buffer might be shared by +/// multiple `RecordBatch`es or multiple rows in the same batch might reference scattered +/// locations in a large buffer. +/// +/// When writing each batch to disk, the writer has to write all referenced buffers. This +/// causes extra disk reads and writes, and potentially execution failure (e.g. No space left +/// on device). +/// +/// # Example +/// +/// Before interleaving: +/// batch1 -> buffer1 (large) +/// batch2 -> buffer2 (large) +/// +/// interleaved_batch -> buffer1 (sparse access) +/// -> buffer2 (sparse access) +/// +/// Then when spilling the interleaved batch, the writer has to write both buffer1 and buffer2 +/// entirely, even if only a few bytes are used. +pub(crate) fn compact_batch(batch: RecordBatch) -> Result { + let mut new_columns: Vec> = Vec::with_capacity(batch.num_columns()); + let mut arr_mutated = false; + + for array in batch.columns() { + let (new_array, mutated) = compact_array(Arc::clone(array))?; + new_columns.push(new_array); + arr_mutated |= mutated; + } + + if arr_mutated { + Ok(RecordBatch::try_new(batch.schema(), new_columns)?) + } else { + Ok(batch) + } +} + +/// Recursively compacts view arrays in `array` by calling `gc()` on them. +/// Returns a tuple of the potentially new array and a boolean indicating +/// whether any compaction was performed. +pub(crate) fn compact_array(array: ArrayRef) -> Result<(ArrayRef, bool)> { + if let Some(view_array) = array.as_any().downcast_ref::() { + return Ok((Arc::new(view_array.gc()), true)); + } + if let Some(view_array) = array.as_any().downcast_ref::() { + return Ok((Arc::new(view_array.gc()), true)); + } + + // Fast path for non-nested arrays + if !array.data_type().is_nested() { + return Ok((array, false)); + } + + // Avoid ArrayData -> ArrayRef roundtrips for commonly used data types, + // including StructArray and ListArray. + + if let Some(struct_array) = array.as_any().downcast_ref::() { + let mut mutated = false; + let mut new_columns: Vec = Vec::with_capacity(struct_array.num_columns()); + for col in struct_array.columns() { + let (new_col, col_mutated) = compact_array(Arc::clone(col))?; + mutated |= col_mutated; + new_columns.push(new_col); + } + + if !mutated { + return Ok((array, false)); + } + + let rebuilt = StructArray::new( + struct_array.fields().clone(), + new_columns, + struct_array.nulls().cloned(), + ); + return Ok((Arc::new(rebuilt), true)); + } + + if let Some(list_array) = array.as_any().downcast_ref::() { + let (new_values, mutated) = compact_array(list_array.values().clone())?; + if !mutated { + return Ok((array, false)); + } + + let DataType::List(field) = list_array.data_type() else { + // Defensive: this downcast should only succeed for DataType::List. + return sedona_internal_err!( + "ListArray has non-List data type: {:?}", + list_array.data_type() + ); + }; + + let rebuilt = ListArray::new( + Arc::clone(field), + list_array.offsets().clone(), + new_values, + list_array.nulls().cloned(), + ); + return Ok((Arc::new(rebuilt), true)); + } + + // For nested arrays (Map/Dictionary/etc.), recurse into children via ArrayData. + let data = array.to_data(); + if data.child_data().is_empty() { + return Ok((array, false)); + } + + let mut mutated = false; + let mut new_child_data = Vec::with_capacity(data.child_data().len()); + for child in data.child_data().iter() { + let child_array = make_array(child.clone()); + let (new_child_array, child_mutated) = compact_array(child_array)?; + mutated |= child_mutated; + new_child_data.push(new_child_array.to_data()); + } + + if !mutated { + return Ok((array, false)); + } + + // Rebuild this array with identical buffers/nulls but replaced child_data. + let mut builder = data.into_builder(); + builder = builder.child_data(new_child_data); + let new_data = builder.build()?; + Ok((make_array(new_data), true)) +} /// Estimate the in-memory size of a given RecordBatch. This function estimates the /// size as if the underlying buffers were copied to somewhere else and not shared. @@ -101,10 +240,13 @@ fn get_binary_view_value_size(array_data: &ArrayData) -> Result with many long values. Then slice the list so it contains + // only one row; `compact_array` should compact the nested BinaryView values. + let n = 256; + let long = b"Long string that is definitely longer than 12 bytes"; + + let mut bv_list_builder = ListBuilder::new(BinaryViewBuilder::new()); + for i in 0..n { + bv_list_builder + .values() + .append_value([long, i.to_string().as_bytes()].concat()); + bv_list_builder.append(true); + } + let bv_list: ListArray = bv_list_builder.finish(); + let sliced: ArrayRef = Arc::new(bv_list.slice(0, 1)); + let before_size = get_array_memory_size(&sliced).unwrap(); + + let (compacted, mutated) = compact_array(Arc::clone(&sliced)).unwrap(); + assert!(mutated); + + let after_size = get_array_memory_size(&compacted).unwrap(); + assert!(after_size <= before_size); + } + + #[test] + fn test_compact_array_list_without_view_is_noop() { + let i32_list: ListArray = ListArray::from_iter_primitive::([ + Some(vec![Some(1), Some(2), Some(3)]), + Some(vec![Some(4)]), + ]); + + let array: ArrayRef = Arc::new(i32_list); + let (compacted, mutated) = compact_array(Arc::clone(&array)).unwrap(); + assert!(!mutated); + assert!(Arc::ptr_eq(&array, &compacted)); + } + + #[test] + fn test_compact_array_struct_without_view_is_noop() { + let i32_values = Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3])); + let bool_values = Arc::new(BooleanArray::from(vec![true, false, true])); + let i32_list: ArrayRef = Arc::new(ListArray::from_iter_primitive::([ + Some(vec![Some(1), Some(2)]), + Some(vec![Some(3)]), + None, + ])); + + let struct_array = StructArray::from(vec![ + ( + Arc::new(Field::new("a", DataType::Int32, false)), + i32_values as ArrayRef, + ), + ( + Arc::new(Field::new("b", DataType::Boolean, false)), + bool_values as ArrayRef, + ), + ( + Arc::new(Field::new("c", i32_list.data_type().clone(), true)), + i32_list, + ), + ]); + + let array: ArrayRef = Arc::new(struct_array); + let (compacted, mutated) = compact_array(Arc::clone(&array)).unwrap(); + assert!(!mutated); + assert!(Arc::ptr_eq(&array, &compacted)); + } }