diff --git a/c/sedona-geos/benches/geos-functions.rs b/c/sedona-geos/benches/geos-functions.rs index 13c2040f9..74152c541 100644 --- a/c/sedona-geos/benches/geos-functions.rs +++ b/c/sedona-geos/benches/geos-functions.rs @@ -46,6 +46,8 @@ fn criterion_benchmark(c: &mut Criterion) { benchmark::scalar(c, &f, "geos", "st_centroid", Polygon(10)); benchmark::scalar(c, &f, "geos", "st_centroid", Polygon(500)); + benchmark::scalar(c, &f, "geos", "st_convexhull", MultiPoint(10)); + benchmark::scalar( c, &f, diff --git a/c/sedona-geos/src/lib.rs b/c/sedona-geos/src/lib.rs index 5e3471000..667bc8234 100644 --- a/c/sedona-geos/src/lib.rs +++ b/c/sedona-geos/src/lib.rs @@ -23,6 +23,7 @@ pub mod register; mod st_area; mod st_buffer; mod st_centroid; +mod st_convexhull; mod st_dwithin; mod st_length; mod st_perimeter; diff --git a/c/sedona-geos/src/register.rs b/c/sedona-geos/src/register.rs index cd7e53635..15e18c662 100644 --- a/c/sedona-geos/src/register.rs +++ b/c/sedona-geos/src/register.rs @@ -16,6 +16,7 @@ // under the License. use sedona_expr::scalar_udf::ScalarKernelRef; +use crate::st_convexhull::st_convex_hull_impl; use crate::{ distance::st_distance_impl, st_area::st_area_impl, st_buffer::st_buffer_impl, st_centroid::st_centroid_impl, st_dwithin::st_dwithin_impl, st_length::st_length_impl, @@ -37,6 +38,7 @@ pub fn scalar_kernels() -> Vec<(&'static str, ScalarKernelRef)> { ("st_buffer", st_buffer_impl()), ("st_centroid", st_centroid_impl()), ("st_contains", st_contains_impl()), + ("st_convexhull", st_convex_hull_impl()), ("st_coveredby", st_covered_by_impl()), ("st_covers", st_covers_impl()), ("st_difference", st_difference_impl()), diff --git a/c/sedona-geos/src/st_convexhull.rs b/c/sedona-geos/src/st_convexhull.rs new file mode 100644 index 000000000..b38613611 --- /dev/null +++ b/c/sedona-geos/src/st_convexhull.rs @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use std::sync::Arc; + +use arrow_array::builder::BinaryBuilder; +use datafusion_common::{error::Result, DataFusionError}; +use datafusion_expr::ColumnarValue; +use geos::Geom; +use sedona_expr::scalar_udf::{ArgMatcher, ScalarKernelRef, SedonaScalarKernel}; +use sedona_geometry::wkb_factory::WKB_MIN_PROBABLE_BYTES; +use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY}; + +use crate::executor::GeosExecutor; + +/// ST_ConvexHull() implementation using the geos crate +pub fn st_convex_hull_impl() -> ScalarKernelRef { + Arc::new(STConvexHull {}) +} + +#[derive(Debug)] +struct STConvexHull {} + +impl SedonaScalarKernel for STConvexHull { + fn return_type(&self, args: &[SedonaType]) -> Result> { + let matcher = ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY); + + matcher.match_args(args) + } + + fn invoke_batch( + &self, + arg_types: &[SedonaType], + args: &[ColumnarValue], + ) -> Result { + let executor = GeosExecutor::new(arg_types, args); + let mut builder = BinaryBuilder::with_capacity( + executor.num_iterations(), + WKB_MIN_PROBABLE_BYTES * executor.num_iterations(), + ); + executor.execute_wkb_void(|maybe_wkb| { + match maybe_wkb { + Some(wkb) => { + invoke_scalar(&wkb, &mut builder)?; + builder.append_value([]); + } + _ => builder.append_null(), + } + + Ok(()) + })?; + + executor.finish(Arc::new(builder.finish())) + } +} + +fn invoke_scalar(geos_geom: &geos::Geometry, writer: &mut impl std::io::Write) -> Result<()> { + let geometry = geos_geom + .convex_hull() + .map_err(|e| DataFusionError::Execution(format!("Failed to calculate convex_hull: {e}")))?; + + let wkb = geometry + .to_wkb() + .map_err(|e| DataFusionError::Execution(format!("Failed to convert to wkb: {e}")))?; + + writer.write_all(wkb.as_ref())?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use datafusion_common::ScalarValue; + use rstest::rstest; + use sedona_expr::scalar_udf::SedonaScalarUDF; + use sedona_schema::datatypes::{WKB_GEOMETRY, WKB_VIEW_GEOMETRY}; + use sedona_testing::compare::assert_array_equal; + use sedona_testing::create::create_array; + use sedona_testing::testers::ScalarUdfTester; + + use super::*; + + #[rstest] + fn udf(#[values(WKB_GEOMETRY, WKB_VIEW_GEOMETRY)] sedona_type: SedonaType) { + let udf = SedonaScalarUDF::from_kernel("st_convex_hull", st_convex_hull_impl()); + let tester = ScalarUdfTester::new(udf.into(), vec![sedona_type]); + tester.assert_return_type(WKB_GEOMETRY); + + let result = tester + .invoke_scalar("MULTIPOINT ((0 0), (0 1), (1 1), (1 0))") + .unwrap(); + tester.assert_scalar_result_equals(result, "POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))"); + + let result = tester.invoke_scalar(ScalarValue::Null).unwrap(); + assert!(result.is_null()); + + let input_wkt = vec![ + Some("MULTIPOINT ((0 0), (0 1), (1 1), (1 0))"), + Some("POINT EMPTY"), + None, + ]; + + let expected = create_array( + &[ + Some("POLYGON ((0 0, 0 1, 1 1, 1 0, 0 0))"), + Some("GEOMETRYCOLLECTION EMPTY"), + None, + ], + &WKB_GEOMETRY, + ); + assert_array_equal(&tester.invoke_wkb_array(input_wkt).unwrap(), &expected); + } +} diff --git a/python/sedonadb/tests/functions/test_aggregate.py b/python/sedonadb/tests/functions/test_aggregate.py new file mode 100644 index 000000000..15defddfe --- /dev/null +++ b/python/sedonadb/tests/functions/test_aggregate.py @@ -0,0 +1,117 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import pytest +from sedonadb.testing import PostGIS, SedonaDB + + +@pytest.mark.parametrize("eng", [SedonaDB, PostGIS]) +def test_st_collect_points(eng): + eng = eng.create_or_skip() + eng.assert_query_result( + """SELECT ST_Collect(ST_GeomFromText(geom)) FROM ( + VALUES + ('POINT (1 2)'), + ('POINT (3 4)'), + (NULL) + ) AS t(geom)""", + "MULTIPOINT (1 2, 3 4)", + ) + + +@pytest.mark.parametrize("eng", [SedonaDB, PostGIS]) +def test_st_collect_linestrings(eng): + eng = eng.create_or_skip() + eng.assert_query_result( + """SELECT ST_Collect(ST_GeomFromText(geom)) FROM ( + VALUES + ('LINESTRING (1 2, 3 4)'), + ('LINESTRING (5 6, 7 8)'), + (NULL) + ) AS t(geom)""", + "MULTILINESTRING ((1 2, 3 4), (5 6, 7 8))", + ) + + +@pytest.mark.parametrize("eng", [SedonaDB, PostGIS]) +def test_st_collect_polygons(eng): + eng = eng.create_or_skip() + eng.assert_query_result( + """SELECT ST_Collect(ST_GeomFromText(geom)) FROM ( + VALUES + ('POLYGON ((0 0, 1 0, 0 1, 0 0))'), + ('POLYGON ((10 10, 11 10, 10 11, 10 10))'), + (NULL) + ) AS t(geom)""", + "MULTIPOLYGON (((0 0, 1 0, 0 1, 0 0)), ((10 10, 11 10, 10 11, 10 10)))", + ) + + +@pytest.mark.parametrize("eng", [SedonaDB, PostGIS]) +def test_st_collect_mixed_types(eng): + eng = eng.create_or_skip() + eng.assert_query_result( + """SELECT ST_Collect(ST_GeomFromText(geom)) FROM ( + VALUES + ('POINT (1 2)'), + ('LINESTRING (3 4, 5 6)'), + (NULL) + ) AS t(geom)""", + "GEOMETRYCOLLECTION (POINT (1 2), LINESTRING (3 4, 5 6))", + ) + + +@pytest.mark.parametrize("eng", [SedonaDB, PostGIS]) +def test_st_collect_mixed_dimensions(eng): + eng = eng.create_or_skip() + + with pytest.raises(Exception, match="mixed dimension geometries"): + eng.assert_query_result( + """SELECT ST_Collect(ST_GeomFromText(geom)) FROM ( + VALUES + ('POINT (1 2)'), + ('POINT Z (3 4 5)'), + (NULL) + ) AS t(geom)""", + "MULTIPOINT (1 2, 3 4)", + ) + + +@pytest.mark.parametrize("eng", [SedonaDB, PostGIS]) +def test_st_collect_all_null(eng): + eng = eng.create_or_skip() + eng.assert_query_result( + """SELECT ST_Collect(geom) FROM ( + VALUES + (NULL), + (NULL), + (NULL) + ) AS t(geom)""", + None, + ) + + +@pytest.mark.parametrize("eng", [SedonaDB, PostGIS]) +def test_st_collect_zero_input(eng): + eng = eng.create_or_skip() + eng.assert_query_result( + """SELECT ST_Collect(ST_GeomFromText(geom)) AS empty FROM ( + VALUES + ('POINT (1 2)') + ) AS t(geom) WHERE false""", + None, + ) diff --git a/python/sedonadb/tests/functions/test_functions.py b/python/sedonadb/tests/functions/test_functions.py index 7e3e1e3c9..bf2b8f925 100644 --- a/python/sedonadb/tests/functions/test_functions.py +++ b/python/sedonadb/tests/functions/test_functions.py @@ -188,6 +188,40 @@ def test_st_centroid(eng, geom, expected): eng.assert_query_result(f"SELECT ST_Centroid({geom_or_null(geom)})", expected) +@pytest.mark.parametrize("eng", [SedonaDB, PostGIS]) +@pytest.mark.parametrize( + ("geom", "expected"), + [ + (None, None), + ("POINT (0 0)", "POINT (0 0)"), + ("MULTIPOINT (0 0, 1 1)", "LINESTRING (0 0, 1 1)"), + ("MULTIPOINT (0 0, 1 1, 1 0)", "POLYGON ((0 0, 1 1, 1 0, 0 0))"), + ("MULTIPOINT (0 0, 1 1, 1 0, 0.5 0.25)", "POLYGON ((0 0, 1 1, 1 0, 0 0))"), + ], +) +def test_st_convexhull(eng, geom, expected): + eng = eng.create_or_skip() + eng.assert_query_result(f"SELECT ST_ConvexHull({geom_or_null(geom)})", expected) + + +@pytest.mark.parametrize("eng", [SedonaDB, PostGIS]) +def test_st_makeline(eng): + eng = eng.create_or_skip() + eng.assert_query_result( + "SELECT ST_MakeLine(ST_Point(0, 1), ST_Point(2, 3))", "LINESTRING (0 1, 2 3)" + ) + + eng.assert_query_result( + "SELECT ST_MakeLine(ST_Point(0, 1), ST_GeomFromText('LINESTRING (0 1, 2 3)'))", + "LINESTRING (0 1, 2 3)", + ) + + eng.assert_query_result( + "SELECT ST_MakeLine(ST_Point(0, 1), ST_GeomFromText('LINESTRING (2 3, 4 5)'))", + "LINESTRING (0 1, 2 3, 4 5)", + ) + + @pytest.mark.parametrize("eng", [SedonaDB, PostGIS]) @pytest.mark.parametrize( ("geom", "expected"), diff --git a/rust/sedona-functions/benches/native-functions.rs b/rust/sedona-functions/benches/native-functions.rs index 0aa4c1a5d..3f9d70f0b 100644 --- a/rust/sedona-functions/benches/native-functions.rs +++ b/rust/sedona-functions/benches/native-functions.rs @@ -58,6 +58,23 @@ fn criterion_benchmark(c: &mut Criterion) { Transformed(LineString(10).into(), st_astext.clone()), ); + benchmark::scalar(c, &f, "native", "st_hasz", Point); + benchmark::scalar(c, &f, "native", "st_hasz", LineString(10)); + + benchmark::scalar(c, &f, "native", "st_hasm", Point); + benchmark::scalar(c, &f, "native", "st_hasm", LineString(10)); + + benchmark::scalar(c, &f, "native", "st_isempty", Point); + benchmark::scalar(c, &f, "native", "st_isempty", LineString(10)); + + benchmark::scalar( + c, + &f, + "native", + "st_makeline", + BenchmarkArgs::ArrayArray(Point, Point), + ); + benchmark::scalar( c, &f, @@ -103,15 +120,6 @@ fn criterion_benchmark(c: &mut Criterion) { ), ); - benchmark::scalar(c, &f, "native", "st_hasz", Point); - benchmark::scalar(c, &f, "native", "st_hasz", LineString(10)); - - benchmark::scalar(c, &f, "native", "st_hasm", Point); - benchmark::scalar(c, &f, "native", "st_hasm", LineString(10)); - - benchmark::scalar(c, &f, "native", "st_isempty", Point); - benchmark::scalar(c, &f, "native", "st_isempty", LineString(10)); - benchmark::scalar(c, &f, "native", "st_x", Point); benchmark::scalar(c, &f, "native", "st_y", Point); benchmark::scalar(c, &f, "native", "st_z", Point); @@ -132,6 +140,9 @@ fn criterion_benchmark(c: &mut Criterion) { benchmark::aggregate(c, &f, "native", "st_analyze_aggr", Point); benchmark::aggregate(c, &f, "native", "st_analyze_aggr", LineString(10)); + + benchmark::aggregate(c, &f, "native", "st_collect", Point); + benchmark::aggregate(c, &f, "native", "st_collect", LineString(10)); } criterion_group!(benches, criterion_benchmark); diff --git a/rust/sedona-functions/src/executor.rs b/rust/sedona-functions/src/executor.rs index 7f1aed261..081b6276f 100644 --- a/rust/sedona-functions/src/executor.rs +++ b/rust/sedona-functions/src/executor.rs @@ -17,6 +17,7 @@ use std::iter::zip; use arrow_array::ArrayRef; +use arrow_schema::DataType; use datafusion_common::cast::{as_binary_array, as_binary_view_array}; use datafusion_common::error::Result; use datafusion_common::{DataFusionError, ScalarValue}; @@ -314,7 +315,7 @@ impl IterGeo for ArrayRef { &'a self, sedona_type: &SedonaType, num_iterations: usize, - func: F, + mut func: F, ) -> Result<()> { if num_iterations != self.len() { return sedona_internal_err!( @@ -324,6 +325,13 @@ impl IterGeo for ArrayRef { } match sedona_type { + SedonaType::Arrow(DataType::Null) => { + for _ in 0..num_iterations { + func(None)?; + } + + Ok(()) + } SedonaType::Wkb(_, _) => iter_wkb_binary(as_binary_array(self)?, func), SedonaType::WkbView(_, _) => iter_wkb_binary(as_binary_view_array(self)?, func), _ => { diff --git a/rust/sedona-functions/src/lib.rs b/rust/sedona-functions/src/lib.rs index 1041fb163..b1425a585 100644 --- a/rust/sedona-functions/src/lib.rs +++ b/rust/sedona-functions/src/lib.rs @@ -28,6 +28,7 @@ mod st_asbinary; mod st_astext; mod st_buffer; mod st_centroid; +mod st_collect; mod st_dimension; mod st_dwithin; pub mod st_envelope; @@ -39,6 +40,7 @@ mod st_haszm; pub mod st_intersection_aggr; mod st_isempty; mod st_length; +mod st_makeline; mod st_perimeter; mod st_point; mod st_pointzm; diff --git a/rust/sedona-functions/src/register.rs b/rust/sedona-functions/src/register.rs index b5208ac8a..63836755c 100644 --- a/rust/sedona-functions/src/register.rs +++ b/rust/sedona-functions/src/register.rs @@ -78,6 +78,7 @@ pub fn default_function_set() -> FunctionSet { crate::st_haszm::st_hasz_udf, crate::st_isempty::st_isempty_udf, crate::st_length::st_length_udf, + crate::st_makeline::st_makeline_udf, crate::st_perimeter::st_perimeter_udf, crate::st_point::st_geogpoint_udf, crate::st_point::st_point_udf, @@ -104,6 +105,7 @@ pub fn default_function_set() -> FunctionSet { register_aggregate_udfs!( function_set, crate::st_analyze_aggr::st_analyze_aggr_udf, + crate::st_collect::st_collect_udf, crate::st_envelope_aggr::st_envelope_aggr_udf, crate::st_intersection_aggr::st_intersection_aggr_udf, crate::st_union_aggr::st_union_aggr_udf, diff --git a/rust/sedona-functions/src/sd_format.rs b/rust/sedona-functions/src/sd_format.rs index f0f9fd8ee..75dd6f45a 100644 --- a/rust/sedona-functions/src/sd_format.rs +++ b/rust/sedona-functions/src/sd_format.rs @@ -549,7 +549,7 @@ mod tests { ); let result = tester.invoke_array(test_array.clone()).unwrap(); if !matches!(expected_data_type, DataType::ListView(_)) { - assert_eq!(&result, &test_array, "Failed for test case: {description}",); + assert_eq!(&result, &test_array, "Failed for test case: {description}"); } } } diff --git a/rust/sedona-functions/src/st_collect.rs b/rust/sedona-functions/src/st_collect.rs new file mode 100644 index 000000000..33dcb13f0 --- /dev/null +++ b/rust/sedona-functions/src/st_collect.rs @@ -0,0 +1,347 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use std::{io::Write, sync::Arc, vec}; + +use crate::executor::WkbExecutor; +use arrow_array::ArrayRef; +use arrow_schema::{DataType, Field, FieldRef}; +use datafusion_common::{ + cast::{as_binary_array, as_int64_array, as_string_array}, + error::{DataFusionError, Result}, + exec_err, HashSet, ScalarValue, +}; +use datafusion_expr::{ + scalar_doc_sections::DOC_SECTION_OTHER, Accumulator, ColumnarValue, Documentation, Volatility, +}; +use geo_traits::Dimensions; +use sedona_common::sedona_internal_err; +use sedona_expr::{ + aggregate_udf::{SedonaAccumulator, SedonaAggregateUDF}, + scalar_udf::ArgMatcher, +}; +use sedona_geometry::{ + types::{GeometryTypeAndDimensions, GeometryTypeId}, + wkb_factory::{ + write_wkb_geometrycollection_header, write_wkb_multilinestring_header, + write_wkb_multipoint_header, write_wkb_multipolygon_header, + }, +}; +use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY}; + +/// ST_Collect() aggregate UDF implementation +/// +/// An implementation of envelope (bounding shape) calculation. +pub fn st_collect_udf() -> SedonaAggregateUDF { + SedonaAggregateUDF::new( + "st_collect", + vec![Arc::new(STCollectAggr {})], + Volatility::Immutable, + Some(st_collect_doc()), + ) +} + +fn st_collect_doc() -> Documentation { + Documentation::builder( + DOC_SECTION_OTHER, + "Return the entire envelope boundary of all geometries in geom", + "ST_Collect (geom: Geometry)", + ) + .with_argument("geom", "geometry: Input geometry or geography") + .with_sql_example("SELECT ST_Collect(ST_GeomFromWKT('MULTIPOINT (0 1, 10 11)'))") + .build() +} + +#[derive(Debug)] +struct STCollectAggr {} + +impl SedonaAccumulator for STCollectAggr { + fn return_type(&self, args: &[SedonaType]) -> Result> { + let matcher = ArgMatcher::new(vec![ArgMatcher::is_geometry_or_geography()], WKB_GEOMETRY); + matcher.match_args(args) + } + + fn accumulator( + &self, + args: &[SedonaType], + output_type: &SedonaType, + ) -> Result> { + Ok(Box::new(CollectionAccumulator::try_new( + args[0].clone(), + output_type.clone(), + )?)) + } + + fn state_fields(&self, _args: &[SedonaType]) -> Result> { + Ok(vec![ + Arc::new(Field::new("unique_geometry_types", DataType::Utf8, false)), + Arc::new(Field::new("unique_dimensions", DataType::Utf8, false)), + Arc::new(Field::new("count", DataType::Int64, false)), + Arc::new(WKB_GEOMETRY.to_storage_field("item", true)?), + ]) + } +} + +#[derive(Debug)] +struct CollectionAccumulator { + input_type: SedonaType, + unique_geometry_types: HashSet, + unique_dimensions: HashSet, + count: i64, + item: Vec, +} + +impl CollectionAccumulator { + pub fn try_new(input_type: SedonaType, _output_type: SedonaType) -> Result { + Ok(Self { + input_type, + unique_geometry_types: HashSet::new(), + unique_dimensions: HashSet::new(), + count: 0, + item: Vec::new(), + }) + } + + // Create a WKB result based on the current state of the accumulator. + fn make_wkb_result(&mut self) -> Result>> { + if self.count == 0 { + return Ok(None); + } + + // Generate the correct header: collections of points become multipoint, ensure + // dimensions are preserved if possible. + let mut new_item = Vec::new(); + let count_usize = self.count.try_into().unwrap(); + + if self.unique_dimensions.len() != 1 { + return exec_err!("Can't ST_Collect() mixed dimension geometries"); + } + + let dimensions = *self.unique_dimensions.iter().next().unwrap(); + if self.unique_geometry_types.len() == 1 { + match self.unique_geometry_types.iter().next().unwrap() { + GeometryTypeId::Point => { + write_wkb_multipoint_header(&mut new_item, dimensions, count_usize) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + GeometryTypeId::LineString => { + write_wkb_multilinestring_header(&mut new_item, dimensions, count_usize) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + GeometryTypeId::Polygon => { + write_wkb_multipolygon_header(&mut new_item, dimensions, count_usize) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + _ => { + write_wkb_geometrycollection_header(&mut new_item, dimensions, count_usize) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + } + } else { + write_wkb_geometrycollection_header(&mut new_item, dimensions, count_usize) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + } + + // Write the rest of item into the output and return it + new_item.extend(self.item.iter()); + Ok(Some(new_item)) + } +} + +impl Accumulator for CollectionAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let arg_types = [self.input_type.clone()]; + let args = [ColumnarValue::Array(values[0].clone())]; + let executor = WkbExecutor::new(&arg_types, &args); + executor.execute_wkb_void(|maybe_item| { + if let Some(item) = maybe_item { + let type_and_dims = GeometryTypeAndDimensions::try_from_geom(&item) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + self.unique_geometry_types + .insert(type_and_dims.geometry_type()); + self.unique_dimensions.insert(type_and_dims.dimensions()); + self.count += 1; + self.item.write_all(item.buf())?; + } + Ok(()) + })?; + Ok(()) + } + + fn evaluate(&mut self) -> Result { + let wkb = self.make_wkb_result()?; + Ok(ScalarValue::Binary(wkb)) + } + + fn state(&mut self) -> Result> { + let geometry_types_value = + serde_json::to_string(&self.unique_geometry_types.iter().collect::>()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let dimensions_value = serde_json::to_string( + &self + .unique_dimensions + .iter() + .map(|dim| GeometryTypeAndDimensions::new(GeometryTypeId::Geometry, *dim)) + .collect::>(), + ) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + let serialized_geometry_types = ScalarValue::Utf8(Some(geometry_types_value)); + let serialized_dimensions = ScalarValue::Utf8(Some(dimensions_value)); + let serialized_count = ScalarValue::Int64(Some(self.count)); + let serialized_item = ScalarValue::Binary(Some(self.item.clone())); + + Ok(vec![ + serialized_geometry_types, + serialized_dimensions, + serialized_count, + serialized_item, + ]) + } + + fn size(&self) -> usize { + size_of::() + self.item.capacity() + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + if states.len() != 4 { + return sedona_internal_err!( + "Unexpected number of state fields for st_collect() (expected 4, got {})", + states.len() + ); + } + + let mut geometry_types_iter = as_string_array(&states[0])?.into_iter(); + let mut dimensions_iter = as_string_array(&states[1])?.into_iter(); + let mut count_iter = as_int64_array(&states[2])?.into_iter(); + let mut item_iter = as_binary_array(&states[3])?.into_iter(); + + for _ in 0..geometry_types_iter.len() { + match ( + geometry_types_iter.next(), + dimensions_iter.next(), + count_iter.next(), + item_iter.next(), + ) { + ( + Some(Some(serialized_geometry_types)), + Some(Some(serialized_dimensions)), + Some(Some(count)), + Some(Some(item)), + ) => { + let geometry_types = + serde_json::from_str::>(serialized_geometry_types) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + let dimensions = serde_json::from_str::>( + serialized_dimensions, + ) + .map_err(|e| DataFusionError::External(Box::new(e)))? + .into_iter() + .map(|item| item.dimensions()); + + self.unique_geometry_types.extend(geometry_types); + self.unique_dimensions.extend(dimensions); + self.count += count; + self.item.extend_from_slice(item); + } + _ => { + return sedona_internal_err!( + "unexpected nulls in st_collect() serialized state" + ) + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use datafusion_expr::AggregateUDF; + use rstest::rstest; + use sedona_schema::datatypes::WKB_VIEW_GEOMETRY; + use sedona_testing::{compare::assert_scalar_equal_wkb_geometry, testers::AggregateUdfTester}; + + use super::*; + + #[test] + fn udf_metadata() { + let udf: AggregateUDF = st_collect_udf().into(); + assert_eq!(udf.name(), "st_collect"); + assert!(udf.documentation().is_some()); + } + + #[rstest] + fn udf(#[values(WKB_GEOMETRY, WKB_VIEW_GEOMETRY)] sedona_type: SedonaType) { + let tester = AggregateUdfTester::new(st_collect_udf().into(), vec![sedona_type.clone()]); + assert_eq!(tester.return_type().unwrap(), WKB_GEOMETRY); + + // Finite point input with nulls + let batches = vec![ + vec![Some("POINT (0 1)"), None, Some("POINT (2 3)")], + vec![Some("POINT (4 5)"), None, Some("POINT (6 7)")], + ]; + assert_scalar_equal_wkb_geometry( + &tester.aggregate_wkt(batches).unwrap(), + Some("MULTIPOINT (0 1, 2 3, 4 5, 6 7)"), + ); + + // Finite linestring input with nulls + let batches = vec![ + vec![Some("LINESTRING (0 1, 2 3)"), None], + vec![Some("LINESTRING (4 5, 6 7)"), None], + ]; + assert_scalar_equal_wkb_geometry( + &tester.aggregate_wkt(batches).unwrap(), + Some("MULTILINESTRING ((0 1, 2 3), (4 5, 6 7))"), + ); + + // Finite polygon input with nulls + let batches = vec![ + vec![Some("POLYGON ((0 0, 1 0, 0 1, 0 0))"), None], + vec![Some("POLYGON ((10 10, 11 10, 10 11, 10 10))"), None], + ]; + assert_scalar_equal_wkb_geometry( + &tester.aggregate_wkt(batches).unwrap(), + Some("MULTIPOLYGON (((0 0, 1 0, 0 1, 0 0)), ((10 10, 11 10, 10 11, 10 10)))"), + ); + + // Mixed input + let batches = vec![ + vec![Some("POINT (0 1)"), None], + vec![Some("LINESTRING (4 5, 6 7)"), None], + ]; + assert_scalar_equal_wkb_geometry( + &tester.aggregate_wkt(batches).unwrap(), + Some("GEOMETRYCOLLECTION (POINT (0 1), LINESTRING (4 5, 6 7))"), + ); + + // Empty input + assert_scalar_equal_wkb_geometry(&tester.aggregate_wkt(vec![]).unwrap(), None); + + // Error for mixed dimensions + let batches = vec![ + vec![Some("POINT (0 1)"), None], + vec![Some("POINT Z (0 1 2)"), None], + ]; + let err = tester.aggregate_wkt(batches).unwrap_err(); + assert_eq!( + err.message(), + "Can't ST_Collect() mixed dimension geometries" + ); + } +} diff --git a/rust/sedona-functions/src/st_makeline.rs b/rust/sedona-functions/src/st_makeline.rs new file mode 100644 index 000000000..7f99b3be6 --- /dev/null +++ b/rust/sedona-functions/src/st_makeline.rs @@ -0,0 +1,329 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use std::{io::Write, sync::Arc, vec}; + +use arrow_array::builder::BinaryBuilder; +use datafusion_common::error::Result; +use datafusion_common::exec_err; +use datafusion_common::DataFusionError; +use datafusion_expr::{ + scalar_doc_sections::DOC_SECTION_OTHER, ColumnarValue, Documentation, Volatility, +}; +use geo_traits::{CoordTrait, GeometryTrait, LineStringTrait, MultiPointTrait, PointTrait}; +use sedona_common::sedona_internal_err; +use sedona_expr::scalar_udf::{ArgMatcher, SedonaScalarKernel, SedonaScalarUDF}; +use sedona_geometry::wkb_factory::write_wkb_linestring_header; +use sedona_schema::datatypes::WKB_GEOGRAPHY; +use sedona_schema::datatypes::{SedonaType, WKB_GEOMETRY}; + +use crate::executor::WkbExecutor; + +/// ST_MakeLine() scalar UDF implementation +/// +/// Native implementation to create geometries from coordinates. +/// See [`st_geogline_udf`] for the corresponding geography constructor. +pub fn st_makeline_udf() -> SedonaScalarUDF { + SedonaScalarUDF::new( + "st_makeline", + vec![ + Arc::new(STMakeLine { + out_type: WKB_GEOMETRY, + }), + Arc::new(STMakeLine { + out_type: WKB_GEOGRAPHY, + }), + ], + Volatility::Immutable, + Some(doc()), + ) +} + +fn doc() -> Documentation { + Documentation::builder( + DOC_SECTION_OTHER, + "Construct a line".to_string(), + "ST_MakeLine (g1: Geometry or Geography, g2: Geometry or Geography)".to_string(), + ) + .with_argument("g1", "Geometry or Geography: The first point or geometry") + .with_argument("g2", "Geometry or Geography: The second point or geometry") + .with_sql_example("SELECT ST_MakeLine(ST_Point(0, 1), ST_Point(2, 3)) as geom") + .build() +} + +#[derive(Debug)] +struct STMakeLine { + out_type: SedonaType, +} + +impl SedonaScalarKernel for STMakeLine { + fn return_type(&self, args: &[SedonaType]) -> Result> { + let match_geom = ArgMatcher::is_geometry(); + let match_geog = ArgMatcher::is_geography(); + + let arg_matchers = if match_geom.match_type(&self.out_type) { + vec![match_geom.clone(), match_geom] + } else if match_geog.match_type(&self.out_type) { + vec![match_geog.clone(), match_geog] + } else { + return sedona_internal_err!("Unexpected ST_MakeLine() output"); + }; + + let matcher = ArgMatcher::new(arg_matchers, self.out_type.clone()); + + matcher.match_args(args) + } + + fn invoke_batch( + &self, + arg_types: &[SedonaType], + args: &[ColumnarValue], + ) -> Result { + let executor = WkbExecutor::new(arg_types, args); + + let min_segment_bytes = 1 + 4 + 4 + 16 + 16; + let mut builder = BinaryBuilder::with_capacity( + executor.num_iterations(), + min_segment_bytes * executor.num_iterations(), + ); + + let mut coords = Vec::new(); + + executor.execute_wkb_wkb_void(|lhs, rhs| { + match (lhs, rhs) { + (Some(lhs), Some(rhs)) => { + invoke_scalar(lhs, rhs, &mut coords, &mut builder)?; + builder.append_value([]); + } + _ => builder.append_null(), + }; + Ok(()) + })?; + executor.finish(Arc::new(builder.finish())) + } +} + +fn invoke_scalar( + g1: &impl GeometryTrait, + g2: &impl GeometryTrait, + coords: &mut Vec, + out: &mut impl Write, +) -> Result<()> { + if g1.dim() != g2.dim() { + return exec_err!("Can't ST_MakeLine() with mismatched dimensions"); + } + + coords.clear(); + let coord_size = g1.dim().size(); + + // Add the first item + add_coords(g1, coords, coord_size, None)?; + + // If there were any coordinates in the second item, pull the last few items and + // pass to add_coords() so it can deduplicate + if coords.len() >= coord_size { + let mut last_coord = [0.0; 4]; + last_coord[0..coord_size] + .copy_from_slice(&coords[(coords.len() - coord_size)..coords.len()]); + add_coords(g2, coords, coord_size, Some(&last_coord[0..coord_size]))?; + } else { + add_coords(g2, coords, coord_size, None)?; + } + + let n_coords = coords.len() / coord_size; + write_wkb_linestring_header(out, g1.dim(), n_coords) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + for ord in coords { + out.write_all(&ord.to_le_bytes())?; + } + + Ok(()) +} + +fn add_coords( + geom: &impl GeometryTrait, + coords: &mut Vec, + coord_size: usize, + last_coord: Option<&[f64]>, +) -> Result<()> { + match geom.as_type() { + geo_traits::GeometryType::Point(pt) => { + if let Some(coord) = pt.coord() { + for j in 0..coord_size { + coords.push(unsafe { coord.nth_unchecked(j) }); + } + } + } + geo_traits::GeometryType::LineString(ls) => { + for (i, coord) in ls.coords().enumerate() { + // Deduplicate the first point of any appended linestring + if i == 0 { + let mut tmp = Vec::new(); + for j in 0..coord_size { + tmp.push(unsafe { coord.nth_unchecked(j) }); + } + + if last_coord != Some(tmp.as_slice()) { + for item in tmp { + coords.push(item); + } + } + } else { + for j in 0..coord_size { + coords.push(unsafe { coord.nth_unchecked(j) }); + } + } + } + } + geo_traits::GeometryType::MultiPoint(mp) => { + for pt in mp.points() { + add_coords(&pt, coords, coord_size, None)?; + } + } + _ => { + return exec_err!( + "ST_MakeLine() only supports Point, LineString, and MultiPoint as input" + ) + } + } + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use datafusion_expr::ScalarUDF; + use rstest::rstest; + use sedona_schema::datatypes::{WKB_GEOGRAPHY, WKB_VIEW_GEOGRAPHY, WKB_VIEW_GEOMETRY}; + use sedona_testing::{ + testers::ScalarUdfTester, + {compare::assert_array_equal, create::create_array}, + }; + + #[test] + fn udf_metadata() { + let geom_from_point: ScalarUDF = st_makeline_udf().into(); + assert_eq!(geom_from_point.name(), "st_makeline"); + assert!(geom_from_point.documentation().is_some()); + } + + #[rstest] + fn udf_invoke(#[values(WKB_GEOMETRY, WKB_VIEW_GEOMETRY)] sedona_type: SedonaType) { + let tester = ScalarUdfTester::new( + st_makeline_udf().into(), + vec![sedona_type.clone(), sedona_type.clone()], + ); + tester.assert_return_type(WKB_GEOMETRY); + + // Basic usage + let result = tester + .invoke_scalar_scalar("POINT (0 1)", "POINT (2 3)") + .unwrap(); + tester.assert_scalar_result_equals(result, "LINESTRING (0 1, 2 3)"); + + // Deduplicating the first point of a linestring + let result = tester + .invoke_scalar_scalar("POINT (0 1)", "LINESTRING (0 1, 2 3)") + .unwrap(); + tester.assert_scalar_result_equals(result, "LINESTRING (0 1, 2 3)"); + + // Two linestrings should work as well + let result = tester + .invoke_scalar_scalar("LINESTRING (0 1, 2 3)", "LINESTRING (4 5, 6 7)") + .unwrap(); + tester.assert_scalar_result_equals(result, "LINESTRING (0 1, 2 3, 4 5, 6 7)"); + + // Also multipoints + let result = tester + .invoke_scalar_scalar("MULTIPOINT (0 1, 2 3)", "MULTIPOINT (4 5, 6 7)") + .unwrap(); + tester.assert_scalar_result_equals(result, "LINESTRING (0 1, 2 3, 4 5, 6 7)"); + + // Mismatched dimensions or unsupported types should error + let err = tester + .invoke_scalar_scalar("POINT (0 1)", "POINT Z (1 2 3)") + .unwrap_err(); + assert_eq!( + err.message(), + "Can't ST_MakeLine() with mismatched dimensions" + ); + + let err = tester + .invoke_scalar_scalar("POINT (0 1)", "POLYGON EMPTY") + .unwrap_err(); + assert_eq!( + err.message(), + "ST_MakeLine() only supports Point, LineString, and MultiPoint as input" + ); + + // Arrays, nulls, and dimensions + let array0 = create_array( + &[ + Some("POINT (0 1)"), + Some("POINT Z (0 1 2)"), + Some("POINT M (0 1 3)"), + Some("POINT ZM (0 1 2 3)"), + Some("POINT (0 0)"), + None, + None, + ], + &sedona_type, + ); + let array1 = create_array( + &[ + Some("POINT (10 11)"), + Some("POINT Z (10 11 12)"), + Some("POINT M (10 11 13)"), + Some("POINT ZM (10 11 12 13)"), + None, + Some("POINT (0 0)"), + None, + ], + &sedona_type, + ); + let expected = create_array( + &[ + Some("LINESTRING (0 1, 10 11)"), + Some("LINESTRING Z (0 1 2, 10 11 12)"), + Some("LINESTRING M (0 1 3, 10 11 13)"), + Some("LINESTRING ZM (0 1 2 3, 10 11 12 13)"), + None, + None, + None, + ], + &WKB_GEOMETRY, + ); + + let result = tester.invoke_array_array(array0, array1).unwrap(); + assert_array_equal(&result, &expected); + } + + #[rstest] + fn udf_invoke_geog(#[values(WKB_GEOGRAPHY, WKB_VIEW_GEOGRAPHY)] sedona_type: SedonaType) { + let tester = ScalarUdfTester::new( + st_makeline_udf().into(), + vec![sedona_type.clone(), sedona_type.clone()], + ); + tester.assert_return_type(WKB_GEOGRAPHY); + + // Basic usage + let result = tester + .invoke_scalar_scalar("POINT (0 1)", "POINT (2 3)") + .unwrap(); + tester.assert_scalar_result_equals(result, "LINESTRING (0 1, 2 3)"); + } +} diff --git a/rust/sedona-geometry/src/analyze.rs b/rust/sedona-geometry/src/analyze.rs index 2a8dea3eb..e34e191bd 100644 --- a/rust/sedona-geometry/src/analyze.rs +++ b/rust/sedona-geometry/src/analyze.rs @@ -15,10 +15,12 @@ // specific language governing permissions and limitations // under the License. use crate::{ - bounding_box::BoundingBox, error::SedonaGeometryError, interval::IntervalTrait, - point_count::count_points, types::GeometryTypeAndDimensions, + bounding_box::BoundingBox, + error::SedonaGeometryError, + interval::IntervalTrait, + point_count::count_points, + types::{GeometryTypeAndDimensions, GeometryTypeId}, }; -use geo_traits::{GeometryTrait, GeometryType}; use wkb::reader::Wkb; /// Contains analysis results for a geometry @@ -40,23 +42,7 @@ pub fn analyze_geometry(geom: &Wkb) -> Result 1, - GeometryType::LineString(_) => 2, - GeometryType::Polygon(_) => 3, - GeometryType::MultiPoint(_) => 4, - GeometryType::MultiLineString(_) => 5, - GeometryType::MultiPolygon(_) => 6, - GeometryType::GeometryCollection(_) => 7, - _ => 0, - }; - - // Handle the Result properly - let geometry_type_id = crate::types::GeometryTypeId::try_from_wkb_id(wkb_type_id) - .unwrap_or(crate::types::GeometryTypeId::Geometry); - - let geometry_type = GeometryTypeAndDimensions::new(geometry_type_id, geom.dim()); + let geometry_type = GeometryTypeAndDimensions::try_from_geom(geom)?; // Get point count directly using the geometry traits let point_count = count_points(geom); @@ -69,21 +55,24 @@ pub fn analyze_geometry(geom: &Wkb) -> Result Result { + let dimensions = geom.dim(); + let geometry_type = match geom.as_type() { + geo_traits::GeometryType::Point(_) => GeometryTypeId::Point, + geo_traits::GeometryType::LineString(_) => GeometryTypeId::LineString, + geo_traits::GeometryType::Polygon(_) => GeometryTypeId::Polygon, + geo_traits::GeometryType::MultiPoint(_) => GeometryTypeId::MultiPoint, + geo_traits::GeometryType::MultiLineString(_) => GeometryTypeId::MultiLineString, + geo_traits::GeometryType::MultiPolygon(_) => GeometryTypeId::MultiPolygon, + geo_traits::GeometryType::GeometryCollection(_) => GeometryTypeId::GeometryCollection, + _ => { + return Err(SedonaGeometryError::Invalid( + "Unsupported geometry type".to_string(), + )) + } + }; + + Ok(Self::new(geometry_type, dimensions)) + } + /// The [GeometryTypeId] pub fn geometry_type(&self) -> GeometryTypeId { self.geometry_type diff --git a/rust/sedona-testing/src/benchmark_util.rs b/rust/sedona-testing/src/benchmark_util.rs index 3ac7a03f2..6b55848d2 100644 --- a/rust/sedona-testing/src/benchmark_util.rs +++ b/rust/sedona-testing/src/benchmark_util.rs @@ -272,6 +272,8 @@ pub enum BenchmarkArgSpec { LineString(usize), /// Randomly generated polygon input with a specified number of vertices Polygon(usize), + /// Randomly generated linestring input with a specified number of vertices + MultiPoint(usize), /// Randomly generated floating point input with a given range of values Float64(f64, f64), /// A transformation of any of the above based on a [ScalarUDF] accepting @@ -289,6 +291,7 @@ impl Debug for BenchmarkArgSpec { Self::Point => write!(f, "Point"), Self::LineString(arg0) => f.debug_tuple("LineString").field(arg0).finish(), Self::Polygon(arg0) => f.debug_tuple("Polygon").field(arg0).finish(), + Self::MultiPoint(arg0) => f.debug_tuple("MultiPoint").field(arg0).finish(), Self::Float64(arg0, arg1) => f.debug_tuple("Float64").field(arg0).field(arg1).finish(), Self::Transformed(inner, t) => write!(f, "{}({:?})", t.name(), inner), Self::String(s) => write!(f, "String({s})"), @@ -302,7 +305,8 @@ impl BenchmarkArgSpec { match self { BenchmarkArgSpec::Point | BenchmarkArgSpec::Polygon(_) - | BenchmarkArgSpec::LineString(_) => WKB_GEOMETRY, + | BenchmarkArgSpec::LineString(_) + | BenchmarkArgSpec::MultiPoint(_) => WKB_GEOMETRY, BenchmarkArgSpec::Float64(_, _) => SedonaType::Arrow(DataType::Float64), BenchmarkArgSpec::Transformed(inner, t) => { let tester = ScalarUdfTester::new(t.clone(), vec![inner.sedona_type()]); @@ -333,13 +337,14 @@ impl BenchmarkArgSpec { ) -> Result> { match self { BenchmarkArgSpec::Point => { - self.build_geometry(i, GeometryTypeId::Point, num_batches, 1, rows_per_batch) + self.build_geometry(i, GeometryTypeId::Point, num_batches, 1, 1, rows_per_batch) } BenchmarkArgSpec::LineString(vertex_count) => self.build_geometry( i, GeometryTypeId::LineString, num_batches, *vertex_count, + 1, rows_per_batch, ), BenchmarkArgSpec::Polygon(vertex_count) => self.build_geometry( @@ -347,6 +352,15 @@ impl BenchmarkArgSpec { GeometryTypeId::Polygon, num_batches, *vertex_count, + 1, + rows_per_batch, + ), + BenchmarkArgSpec::MultiPoint(part_count) => self.build_geometry( + i, + GeometryTypeId::MultiPoint, + num_batches, + 1, + *part_count, rows_per_batch, ), BenchmarkArgSpec::Float64(lo, hi) => { @@ -389,6 +403,7 @@ impl BenchmarkArgSpec { geom_type: GeometryTypeId, num_batches: usize, vertex_count: usize, + num_parts_count: usize, rows_per_batch: usize, ) -> Result> { let builder = RandomPartitionedDataBuilder::new() @@ -399,6 +414,7 @@ impl BenchmarkArgSpec { .bounds(Rect::new((-10.0, -10.0), (10.0, 10.0))) .size_range((0.1, 2.0)) .vertices_per_linestring_range((vertex_count, vertex_count)) + .num_parts_range((num_parts_count, num_parts_count)) .geometry_type(geom_type) // Currently just use WKB_GEOMETRY (we can generate a view type with // Transformed) @@ -561,7 +577,8 @@ mod test { #[values( (BenchmarkArgSpec::Point, GeometryTypeId::Point, 1), (BenchmarkArgSpec::LineString(10), GeometryTypeId::LineString, 10), - (BenchmarkArgSpec::Polygon(10), GeometryTypeId::Polygon, 11) + (BenchmarkArgSpec::Polygon(10), GeometryTypeId::Polygon, 11), + (BenchmarkArgSpec::MultiPoint(10), GeometryTypeId::MultiPoint, 10), )] config: (BenchmarkArgSpec, GeometryTypeId, i64), ) {