Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions rust/sedona-geometry/src/wkb_factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,38 @@ pub fn wkb_polygon<I: ExactSizeIterator<Item = (f64, f64)>>(
Ok(out_wkb)
}

/// Create WKB representing a rectangle
///
/// Returns WKB for a polygon with a single closed ring describing
/// the rectangle bounds.
/// This function always writes little endian coordinates.
pub fn wkb_rect(
min_x: f64,
min_y: f64,
max_x: f64,
max_y: f64,
) -> Result<Vec<u8>, SedonaGeometryError> {
if min_x > max_x {
return Err(SedonaGeometryError::Invalid(
"min_x must be <= max_x".to_string(),
));
}
if min_y > max_y {
return Err(SedonaGeometryError::Invalid(
"min_y must be <= max_y".to_string(),
));
}

let coords = [
(min_x, min_y),
(max_x, min_y),
(max_x, max_y),
(min_x, max_y),
(min_x, min_y),
];
wkb_polygon(coords.into_iter())
}

/// Write WKB representing a POLYGON into a buffer
///
/// This can be used to build Binary arrays, as the arrow-rs BinaryBuilder
Expand Down Expand Up @@ -721,6 +753,23 @@ mod test {
);
}

#[test]
fn test_wkb_rect() {
let wkb = wkb_rect(0.0, 0.0, 2.0, 1.0).unwrap();
check_bytes(&wkb, "POLYGON((0 0,2 0,2 1,0 1,0 0))");

let invalid_cases = [
(2.0, 0.0, 1.0, 1.0),
(0.0, 2.0, 1.0, 1.0),
(2.0, 2.0, 1.0, 1.0),
(1.0, 2.0, 0.0, 1.0),
];

for (min_x, min_y, max_x, max_y) in invalid_cases {
assert!(wkb_rect(min_x, min_y, max_x, max_y).is_err());
}
}

#[test]
fn test_wkb_polygon_header() {
let mut wkb = Vec::new();
Expand Down
26 changes: 2 additions & 24 deletions rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ mod tests {
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use sedona_geometry::bounding_box::BoundingBox;
use sedona_geometry::interval::IntervalTrait;
use sedona_geometry::wkb_factory::wkb_point;
use sedona_geometry::wkb_factory::{wkb_point, wkb_rect};
use sedona_schema::datatypes::WKB_GEOMETRY;

use crate::{
Expand Down Expand Up @@ -792,28 +792,6 @@ mod tests {
})
}

fn rect_wkb(min_x: f64, min_y: f64, max_x: f64, max_y: f64) -> Vec<u8> {
assert!(min_x <= max_x, "min_x must be <= max_x");
assert!(min_y <= max_y, "min_y must be <= max_y");
let mut buf = Vec::with_capacity(1 + 4 + 4 + 4 + 5 * 16);
buf.push(1u8); // little endian
buf.extend_from_slice(&3u32.to_le_bytes()); // polygon type
buf.extend_from_slice(&1u32.to_le_bytes()); // single ring
buf.extend_from_slice(&5u32.to_le_bytes()); // five coordinates (closed ring)
let coords = [
(min_x, min_y),
(max_x, min_y),
(max_x, max_y),
(min_x, max_y),
(min_x, min_y),
];
for (x, y) in coords {
buf.extend_from_slice(&x.to_le_bytes());
buf.extend_from_slice(&y.to_le_bytes());
}
buf
}

fn read_ids(file: &RefCountedTempFile) -> Result<Vec<i32>> {
let mut reader = EvaluatedBatchSpillReader::try_new(file)?;
let mut ids = Vec::new();
Expand Down Expand Up @@ -938,7 +916,7 @@ mod tests {

#[tokio::test]
async fn repartition_multi_and_none() -> Result<()> {
let wkbs = vec![Some(rect_wkb(25.0, 0.0, 75.0, 20.0)), None];
let wkbs = vec![Some(wkb_rect(25.0, 0.0, 75.0, 20.0).unwrap()), None];
let batch = sample_batch(&[0, 1], wkbs)?;
let schema = batch.schema();
let stream: SendableEvaluatedBatchStream =
Expand Down
24 changes: 2 additions & 22 deletions rust/sedona-spatial-join/src/probe/partitioned_stream_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ mod tests {
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use futures::TryStreamExt;
use sedona_geometry::bounding_box::BoundingBox;
use sedona_geometry::wkb_factory::wkb_point;
use sedona_geometry::wkb_factory::{wkb_point, wkb_rect};
use sedona_schema::datatypes::WKB_GEOMETRY;
use std::sync::Arc;

Expand All @@ -365,26 +365,6 @@ mod tests {
Ok(EvaluatedBatch { batch, geom_array })
}

fn rect_wkb(min_x: f64, min_y: f64, max_x: f64, max_y: f64) -> Vec<u8> {
let mut buf = Vec::with_capacity(1 + 4 + 4 + 4 + 5 * 16);
buf.push(1u8);
buf.extend_from_slice(&3u32.to_le_bytes());
buf.extend_from_slice(&1u32.to_le_bytes());
buf.extend_from_slice(&5u32.to_le_bytes());
let coords = [
(min_x, min_y),
(max_x, min_y),
(max_x, max_y),
(min_x, max_y),
(min_x, min_y),
];
for (x, y) in coords {
buf.extend_from_slice(&x.to_le_bytes());
buf.extend_from_slice(&y.to_le_bytes());
}
buf
}

fn ids_from_batches(batches: &[EvaluatedBatch]) -> Vec<Vec<i32>> {
batches
.iter()
Expand Down Expand Up @@ -485,7 +465,7 @@ mod tests {
#[tokio::test]
async fn multi_partition_stream_consumed_multiple_times() -> Result<()> {
let partitioner = sample_partitioner()?;
let batch = sample_batch(&[0], vec![Some(rect_wkb(25.0, 0.0, 75.0, 20.0))])?;
let batch = sample_batch(&[0], vec![Some(wkb_rect(25.0, 0.0, 75.0, 20.0).unwrap())])?;
let probe_stream = create_probe_stream(vec![batch], Some(partitioner));

let first_pass = probe_stream.stream_for(SpatialPartition::Regular(0))?;
Expand Down