Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions rust/sedona-spatial-join/src/evaluated_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
// under the License.

use arrow_array::RecordBatch;
use datafusion_common::Result;
use datafusion_expr::ColumnarValue;
use geo::Rect;
use wkb::reader::Wkb;

use crate::operand_evaluator::EvaluatedGeometryArray;
use crate::{
operand_evaluator::EvaluatedGeometryArray, utils::arrow_utils::get_record_batch_memory_size,
};

/// EvaluatedBatch contains the original record batch from the input stream and the evaluated
/// geometry array.
Expand All @@ -34,12 +37,14 @@ pub(crate) struct EvaluatedBatch {
}

impl EvaluatedBatch {
pub fn in_mem_size(&self) -> usize {
pub fn in_mem_size(&self) -> Result<usize> {
// NOTE: sometimes `geom_array` will reuse the memory of `batch`, especially when
// the expression for evaluating the geometry is a simple column reference. In this case,
// the in_mem_size will be overestimated. It is a conservative estimation so there's no risk
// of running out of memory because of underestimation.
self.batch.get_array_memory_size() + self.geom_array.in_mem_size()
let record_batch_size = get_record_batch_memory_size(&self.batch)?;
let geom_array_size = self.geom_array.in_mem_size()?;
Ok(record_batch_size + geom_array_size)
}

pub fn num_rows(&self) -> usize {
Expand Down
2 changes: 1 addition & 1 deletion rust/sedona-spatial-join/src/index/build_side_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl BuildSideBatchesCollector {
geom_array,
};

let in_mem_size = build_side_batch.in_mem_size();
let in_mem_size = build_side_batch.in_mem_size()?;
metrics.num_batches.add(1);
metrics.num_rows.add(build_side_batch.num_rows());
metrics.total_size_bytes.add(in_mem_size);
Expand Down
20 changes: 10 additions & 10 deletions rust/sedona-spatial-join/src/index/spatial_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch, &WKB_GEOMETRY).unwrap(),
};
builder.add_batch(indexed_batch);
builder.add_batch(indexed_batch).unwrap();

let index = builder.finish().unwrap();
assert_eq!(index.schema(), schema);
Expand Down Expand Up @@ -663,7 +663,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch, &WKB_GEOMETRY).unwrap(),
};
builder.add_batch(indexed_batch);
builder.add_batch(indexed_batch).unwrap();

let index = builder.finish().unwrap();

Expand Down Expand Up @@ -764,7 +764,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch, &WKB_GEOMETRY).unwrap(),
};
builder.add_batch(indexed_batch);
builder.add_batch(indexed_batch).unwrap();

let index = builder.finish().unwrap();

Expand Down Expand Up @@ -850,7 +850,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch, &WKB_GEOMETRY).unwrap(),
};
builder.add_batch(indexed_batch);
builder.add_batch(indexed_batch).unwrap();

let index = builder.finish().unwrap();

Expand Down Expand Up @@ -946,7 +946,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch, &WKB_GEOMETRY).unwrap(),
};
builder.add_batch(indexed_batch);
builder.add_batch(indexed_batch).unwrap();

let index = builder.finish().unwrap();

Expand Down Expand Up @@ -1092,7 +1092,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch, &WKB_GEOMETRY).unwrap(),
};
builder.add_batch(indexed_batch);
builder.add_batch(indexed_batch).unwrap();

let index = builder.finish().unwrap();

Expand Down Expand Up @@ -1206,7 +1206,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch, &WKB_GEOMETRY).unwrap(),
};
builder.add_batch(indexed_batch);
builder.add_batch(indexed_batch).unwrap();

let index = builder.finish().unwrap();

Expand Down Expand Up @@ -1291,7 +1291,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch, &WKB_GEOMETRY).unwrap(),
};
builder.add_batch(indexed_batch);
builder.add_batch(indexed_batch).unwrap();

let index = builder.finish().unwrap();

Expand Down Expand Up @@ -1377,7 +1377,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch, &WKB_GEOMETRY).unwrap(),
};
builder.add_batch(indexed_batch);
builder.add_batch(indexed_batch).unwrap();

let index = builder.finish().unwrap();

Expand Down Expand Up @@ -1493,7 +1493,7 @@ mod tests {
batch,
geom_array: EvaluatedGeometryArray::try_new(geom_batch, &WKB_GEOMETRY).unwrap(),
};
builder.add_batch(indexed_batch);
builder.add_batch(indexed_batch).unwrap();

let index = builder.finish().unwrap();

Expand Down
7 changes: 4 additions & 3 deletions rust/sedona-spatial-join/src/index/spatial_index_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,12 @@ impl SpatialIndexBuilder {
///
/// This method accumulates geometry batches that will be used to build the spatial index.
/// Each batch contains processed geometry data along with memory usage information.
pub fn add_batch(&mut self, indexed_batch: EvaluatedBatch) {
let in_mem_size = indexed_batch.in_mem_size();
pub fn add_batch(&mut self, indexed_batch: EvaluatedBatch) -> Result<()> {
let in_mem_size = indexed_batch.in_mem_size()?;
self.indexed_batches.push(indexed_batch);
self.reservation.grow(in_mem_size);
self.metrics.build_mem_used.add(in_mem_size);
Ok(())
}

pub fn merge_stats(&mut self, stats: GeoStatistics) -> &mut Self {
Expand Down Expand Up @@ -298,7 +299,7 @@ impl SpatialIndexBuilder {
let mut stream = partition.build_side_batch_stream;
while let Some(batch) = stream.next().await {
let indexed_batch = batch?;
self.add_batch(indexed_batch);
self.add_batch(indexed_batch)?;
}
self.merge_stats(partition.geo_statistics);
let mem_bytes = partition.reservation.free();
Expand Down
16 changes: 8 additions & 8 deletions rust/sedona-spatial-join/src/operand_evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ use wkb::reader::Wkb;

use sedona_common::option::SpatialJoinOptions;

use crate::spatial_predicate::{
DistancePredicate, KNNPredicate, RelationPredicate, SpatialPredicate,
use crate::{
spatial_predicate::{DistancePredicate, KNNPredicate, RelationPredicate, SpatialPredicate},
utils::arrow_utils::get_array_memory_size,
};

/// Operand evaluator is for evaluating the operands of a spatial predicate. It can be a distance
Expand Down Expand Up @@ -154,20 +155,19 @@ impl EvaluatedGeometryArray {
&self.wkbs
}

pub fn in_mem_size(&self) -> usize {
pub fn in_mem_size(&self) -> Result<usize> {
let geom_array_size = get_array_memory_size(&self.geometry_array)?;

let distance_in_mem_size = match &self.distance {
Some(ColumnarValue::Array(array)) => array.get_array_memory_size(),
Some(ColumnarValue::Array(array)) => get_array_memory_size(array)?,
_ => 8,
};

// Note: this is not an accurate, because wkbs has inner Vecs. However, the size of inner vecs
// should be small, so the inaccuracy does not matter too much.
let wkb_vec_size = self.wkbs.allocated_size();

self.geometry_array.get_array_memory_size()
+ self.rects.allocated_size()
+ distance_in_mem_size
+ wkb_vec_size
Ok(geom_array_size + self.rects.allocated_size() + distance_in_mem_size + wkb_vec_size)
}
}

Expand Down
1 change: 1 addition & 0 deletions rust/sedona-spatial-join/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

pub(crate) mod arrow_utils;
pub(crate) mod bbox_sampler;
pub(crate) mod concurrent_reservation;
pub(crate) mod init_once_array;
Expand Down
Loading