diff --git a/Cargo.lock b/Cargo.lock index bafb1b7ec..9a8e0b9d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5302,6 +5302,7 @@ dependencies = [ "geo-types", "proj-sys", "rstest", + "sedona-common", "sedona-expr", "sedona-functions", "sedona-geometry", diff --git a/c/sedona-proj/Cargo.toml b/c/sedona-proj/Cargo.toml index 4368a1120..0a145fd17 100644 --- a/c/sedona-proj/Cargo.toml +++ b/c/sedona-proj/Cargo.toml @@ -51,6 +51,7 @@ datafusion-common = { workspace = true } datafusion-expr = { workspace = true } geo-traits = { workspace = true } proj-sys = { version = "0.26.0", optional = true } +sedona-common = { workspace = true } sedona-expr = { workspace = true } sedona-functions = { workspace = true } sedona-geometry = { workspace = true } diff --git a/c/sedona-proj/src/st_transform.rs b/c/sedona-proj/src/st_transform.rs index 47a2a7937..b7fd90a6c 100644 --- a/c/sedona-proj/src/st_transform.rs +++ b/c/sedona-proj/src/st_transform.rs @@ -15,31 +15,353 @@ // specific language governing permissions and limitations // under the License. use crate::transform::{ProjCrsEngine, ProjCrsEngineBuilder}; -use arrow_array::builder::BinaryBuilder; +use arrow_array::builder::{BinaryBuilder, StringViewBuilder}; +use arrow_array::ArrayRef; use arrow_schema::DataType; -use datafusion_common::{DataFusionError, Result, ScalarValue}; +use datafusion_common::cast::{as_string_view_array, as_struct_array}; +use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue}; use datafusion_expr::ColumnarValue; -use geo_traits::to_geo::ToGeoGeometry; +use sedona_common::sedona_internal_err; +use sedona_expr::item_crs::make_item_crs; use sedona_expr::scalar_udf::{ScalarKernelRef, SedonaScalarKernel}; use sedona_functions::executor::WkbExecutor; use sedona_geometry::transform::{transform, CachingCrsEngine, CrsEngine, CrsTransform}; use sedona_geometry::wkb_factory::WKB_MIN_PROBABLE_BYTES; -use sedona_schema::crs::deserialize_crs; -use sedona_schema::datatypes::{Edges, SedonaType}; +use sedona_schema::crs::{deserialize_crs, Crs}; +use sedona_schema::datatypes::{Edges, SedonaType, WKB_GEOMETRY, WKB_GEOMETRY_ITEM_CRS}; use sedona_schema::matchers::ArgMatcher; use std::cell::OnceCell; -use std::rc::Rc; +use std::io::Write; +use std::iter::zip; use std::sync::{Arc, RwLock}; use wkb::reader::Wkb; -#[derive(Debug)] -struct STTransform {} - /// ST_Transform() implementation using the proj crate pub fn st_transform_impl() -> ScalarKernelRef { Arc::new(STTransform {}) } +#[derive(Debug)] +struct STTransform {} + +impl SedonaScalarKernel for STTransform { + fn return_type_from_args_and_scalars( + &self, + arg_types: &[SedonaType], + scalar_args: &[Option<&ScalarValue>], + ) -> Result> { + let inputs = zip(arg_types, scalar_args) + .map(|(arg_type, arg_scalar)| ArgInput::from_return_type_arg(arg_type, *arg_scalar)) + .collect::>(); + + if inputs.len() == 2 { + match (inputs[0], inputs[1]) { + // ScalarCrs output always returns a Wkb output type with concrete Crs + (ArgInput::Geo(_), ArgInput::ScalarCrs(scalar_value)) + | (ArgInput::ItemCrs, ArgInput::ScalarCrs(scalar_value)) => { + Ok(Some(output_type_from_scalar_crs_value(scalar_value)?)) + } + + // Geo or ItemCrs with ArrayCrs output always return ItemCrs output + (ArgInput::Geo(_), ArgInput::ArrayCrs) + | (ArgInput::ItemCrs, ArgInput::ArrayCrs) => { + Ok(Some(WKB_GEOMETRY_ITEM_CRS.clone())) + } + _ => Ok(None), + } + } else if inputs.len() == 3 { + match (inputs[0], inputs[1], inputs[2]) { + // ScalarCrs output always returns a Wkb output type with concrete Crs + (ArgInput::Geo(_), ArgInput::ScalarCrs(_), ArgInput::ScalarCrs(scalar_value)) + | (ArgInput::Geo(_), ArgInput::ArrayCrs, ArgInput::ScalarCrs(scalar_value)) + | (ArgInput::ItemCrs, ArgInput::ScalarCrs(_), ArgInput::ScalarCrs(scalar_value)) + | (ArgInput::ItemCrs, ArgInput::ArrayCrs, ArgInput::ScalarCrs(scalar_value)) => { + Ok(Some(output_type_from_scalar_crs_value(scalar_value)?)) + } + + // Geo or ItemCrs with ArrayCrs output always return ItemCrs output + (ArgInput::Geo(_), ArgInput::ScalarCrs(_), ArgInput::ArrayCrs) + | (ArgInput::Geo(_), ArgInput::ArrayCrs, ArgInput::ArrayCrs) + | (ArgInput::ItemCrs, ArgInput::ScalarCrs(_), ArgInput::ArrayCrs) + | (ArgInput::ItemCrs, ArgInput::ArrayCrs, ArgInput::ArrayCrs) => { + Ok(Some(WKB_GEOMETRY_ITEM_CRS.clone())) + } + _ => Ok(None), + } + } else { + Ok(None) + } + } + + fn invoke_batch_from_args( + &self, + arg_types: &[SedonaType], + args: &[ColumnarValue], + _return_type: &SedonaType, + _num_rows: usize, + ) -> Result { + let inputs = zip(arg_types, args) + .map(|(arg_type, arg)| ArgInput::from_arg(arg_type, arg)) + .collect::>(); + + let executor = WkbExecutor::new(arg_types, args); + let mut builder = BinaryBuilder::with_capacity( + executor.num_iterations(), + WKB_MIN_PROBABLE_BYTES * executor.num_iterations(), + ); + + // Optimize the easy case, where we have exactly one transformation and there are no + // null or missing CRSes to contend with. + let from_index = inputs.len() - 2; + let to_index = inputs.len() - 1; + let (from, to) = (inputs[from_index], inputs[to_index]); + if let (Some(from_constant), Some(to_constant)) = (from.crs_constant()?, to.crs_constant()?) + { + let maybe_from_crs = deserialize_crs(&from_constant)?; + let maybe_to_crs = deserialize_crs(&to_constant)?; + if let (Some(from_crs), Some(to_crs)) = (maybe_from_crs, maybe_to_crs) { + with_global_proj_engine(|engine| { + let crs_transform = engine + .get_transform_crs_to_crs( + &from_crs.to_crs_string(), + &to_crs.to_crs_string(), + None, + "", + ) + .map_err(|e| DataFusionError::Execution(format!("{e}")))?; + executor.execute_wkb_void(|maybe_wkb| { + match maybe_wkb { + Some(wkb) => { + invoke_scalar(&wkb, crs_transform.as_ref(), &mut builder)?; + builder.append_value([]); + } + None => builder.append_null(), + } + Ok(()) + })?; + Ok(()) + })?; + return executor.finish(Arc::new(builder.finish())); + } + } + + // Iterate over pairs of CRS strings + let from_crs_array = from.crs_array(&args[from_index], executor.num_iterations())?; + let to_crs_array = to.crs_array(&args[to_index], executor.num_iterations())?; + let from_crs_string_view_array = as_string_view_array(&from_crs_array)?; + let to_crs_string_view_array = as_string_view_array(&to_crs_array)?; + let mut crs_to_crs_iter = zip(from_crs_string_view_array, to_crs_string_view_array); + + // We might need to build an output array of sanitized CRS strings + let mut maybe_crs_output = if matches!(to, ArgInput::ArrayCrs) { + Some(StringViewBuilder::with_capacity(executor.num_iterations())) + } else { + None + }; + + with_global_proj_engine(|engine| { + executor.execute_wkb_void(|maybe_wkb| { + match (maybe_wkb, crs_to_crs_iter.next().unwrap()) { + (Some(wkb), (Some(from_crs_str), Some(to_crs_str))) => { + let maybe_from_crs = deserialize_crs(from_crs_str)?; + let maybe_to_crs = deserialize_crs(to_crs_str)?; + + if let Some(crs_output) = &mut maybe_crs_output { + if let Some(to_crs) = &maybe_to_crs { + crs_output.append_value(to_crs.to_authority_code()?.unwrap_or_else(|| to_crs.to_crs_string())); + } else { + crs_output.append_null(); + } + } + + if maybe_from_crs == maybe_to_crs { + invoke_noop(&wkb, &mut builder)?; + builder.append_value([]); + return Ok(()); + } + + let crs_transform = match (maybe_from_crs, maybe_to_crs) { + (Some(from_crs), Some(to_crs)) => { + engine + .get_transform_crs_to_crs(&from_crs.to_crs_string(), &to_crs.to_crs_string(), None, "") + .map_err(|e| DataFusionError::Execution(format!("{e}")))? + }, + _ => return exec_err!( + "Can't transform to or from an unset CRS. Do you need to call ST_SetSRID on the input?" + ) + }; + + invoke_scalar(&wkb, crs_transform.as_ref(), &mut builder)?; + builder.append_value([]); + } + _ => { + if let Some(crs_output) = &mut maybe_crs_output { + crs_output.append_null(); + } + + builder.append_null() + }, + } + Ok(()) + })?; + Ok(()) + })?; + + let output_geometry = executor.finish(Arc::new(builder.finish()))?; + if let Some(mut crs_output) = maybe_crs_output { + let output_crs = executor.finish(Arc::new(crs_output.finish()))?; + make_item_crs(&WKB_GEOMETRY, output_geometry, &output_crs, None) + } else { + Ok(output_geometry) + } + } + + fn return_type(&self, _args: &[SedonaType]) -> Result, DataFusionError> { + sedona_internal_err!("Return type should only be called with args") + } + + fn invoke_batch( + &self, + _arg_types: &[SedonaType], + _args: &[ColumnarValue], + ) -> Result { + sedona_internal_err!("invoke_batch should only be called with args") + } +} + +fn output_type_from_scalar_crs_value(scalar_arg: &ScalarValue) -> Result { + if let Some(crs_str) = parse_crs_from_scalar_crs_value(scalar_arg)? { + Ok(SedonaType::Wkb(Edges::Planar, deserialize_crs(&crs_str)?)) + } else { + Ok(WKB_GEOMETRY) + } +} + +fn parse_crs_from_scalar_crs_value(scalar_arg: &ScalarValue) -> Result> { + if let ScalarValue::Utf8(maybe_to_crs_str) = scalar_arg.cast_to(&DataType::Utf8)? { + if let Some(to_crs_str) = maybe_to_crs_str { + Ok(Some( + deserialize_crs(&to_crs_str)? + .map(|crs| crs.to_crs_string()) + .unwrap_or("0".to_string()), + )) + } else { + Ok(None) + } + } else { + sedona_internal_err!("Expected scalar cast to utf8 to be a ScalarValue::Utf8") + } +} + +fn invoke_noop(wkb: &Wkb, builder: &mut impl Write) -> Result<()> { + builder + .write_all(wkb.buf()) + .map_err(DataFusionError::IoError) +} + +fn invoke_scalar(wkb: &Wkb, trans: &dyn CrsTransform, builder: &mut impl Write) -> Result<()> { + transform(wkb, trans, builder) + .map_err(|err| DataFusionError::Execution(format!("Transform error: {err}")))?; + Ok(()) +} + +/// Helper to label arguments because we have a lot argument types that are valid +#[derive(Debug, Clone, Copy)] +enum ArgInput<'a> { + /// Geometry input. This currently only matches geometry and not geography + /// because CRS support for geography is less clear at the moment. Must be + /// the first argument (and not supported for other arguments). + Geo(&'a Crs), + /// Item-level CRS input. Must be the first argument if present (not supported + /// for other arguments). + ItemCrs, + /// Scalar CRS input. Supported for second and third arguments. When present + /// as the last argument (to), this forces type-level CRS output. + ScalarCrs(&'a ScalarValue), + /// Array CRS input. Supported for second and third arguments. When present + /// as the last (to) argument, this forces Item CRS output. + ArrayCrs, + /// Sentinel for anything else + Unsupported, +} + +impl<'a> ArgInput<'a> { + fn from_return_type_arg(arg_type: &'a SedonaType, scalar_arg: Option<&'a ScalarValue>) -> Self { + if ArgMatcher::is_item_crs().match_type(arg_type) { + Self::ItemCrs + } else if ArgMatcher::is_numeric().match_type(arg_type) + || ArgMatcher::is_string().match_type(arg_type) + { + if let Some(scalar_crs) = scalar_arg { + Self::ScalarCrs(scalar_crs) + } else { + Self::ArrayCrs + } + } else { + match arg_type { + SedonaType::Wkb(Edges::Planar, crs) | SedonaType::WkbView(Edges::Planar, crs) => { + Self::Geo(crs) + } + _ => Self::Unsupported, + } + } + } + + fn from_arg(arg_type: &'a SedonaType, arg: &'a ColumnarValue) -> Self { + if ArgMatcher::is_item_crs().match_type(arg_type) { + Self::ItemCrs + } else if ArgMatcher::is_numeric().match_type(arg_type) + || ArgMatcher::is_string().match_type(arg_type) + { + match arg { + ColumnarValue::Array(_) => Self::ArrayCrs, + ColumnarValue::Scalar(scalar_value) => Self::ScalarCrs(scalar_value), + } + } else { + match arg_type { + SedonaType::Wkb(_, crs) | SedonaType::WkbView(_, crs) => Self::Geo(crs), + _ => Self::Unsupported, + } + } + } + + fn crs_constant(&self) -> Result> { + match self { + ArgInput::Geo(crs) => { + let crs_str = if let Some(crs) = crs { + crs.to_crs_string() + } else { + "0".to_string() + }; + + Ok(Some(crs_str)) + } + ArgInput::ScalarCrs(scalar_value) => parse_crs_from_scalar_crs_value(scalar_value), + _ => Ok(None), + } + } + + fn crs_array(&self, arg: &ColumnarValue, iterations: usize) -> Result { + if let Some(crs_constant) = self.crs_constant()? { + ScalarValue::Utf8View(Some(crs_constant)).to_array_of_size(iterations) + } else if matches!(self, Self::ItemCrs) { + match arg { + ColumnarValue::Array(array) => { + let struct_array = as_struct_array(array)?; + Ok(struct_array.column(1).clone()) + } + ColumnarValue::Scalar(ScalarValue::Struct(struct_array)) => { + Ok(struct_array.column(1).clone()) + } + _ => sedona_internal_err!("Unexpected item_crs type"), + } + } else { + arg.cast_to(&DataType::Utf8View, None)? + .into_array(iterations) + } + } +} + /// Configure the global PROJ engine /// /// Provides an opportunity for a calling application to provide the @@ -113,503 +435,370 @@ thread_local! { }; } -struct TransformArgIndexes { - wkb: usize, - first_crs: usize, - second_crs: Option, - lenient: Option, -} - -impl TransformArgIndexes { - fn new() -> Self { - Self { - wkb: 0, - first_crs: 1, - second_crs: None, - lenient: None, - } - } -} - -fn define_arg_indexes(arg_types: &[SedonaType], indexes: &mut TransformArgIndexes) { - indexes.wkb = 0; - indexes.first_crs = 1; - - for (i, arg_type) in arg_types.iter().enumerate().skip(2) { - if ArgMatcher::is_numeric().match_type(arg_type) - || ArgMatcher::is_string().match_type(arg_type) - { - indexes.second_crs = Some(i); - } else if *arg_type == SedonaType::Arrow(DataType::Boolean) { - indexes.lenient = Some(i); - } - } -} - -impl SedonaScalarKernel for STTransform { - fn return_type(&self, _args: &[SedonaType]) -> Result, DataFusionError> { - Err(DataFusionError::Internal( - "Return type should only be called with args".to_string(), - )) - } - fn return_type_from_args_and_scalars( - &self, - arg_types: &[SedonaType], - scalar_args: &[Option<&ScalarValue>], - ) -> Result> { - let matcher = ArgMatcher::new( - vec![ - ArgMatcher::is_geometry(), - ArgMatcher::or(vec![ArgMatcher::is_numeric(), ArgMatcher::is_string()]), - ArgMatcher::optional(ArgMatcher::or(vec![ - ArgMatcher::is_numeric(), - ArgMatcher::is_string(), - ])), - ArgMatcher::optional(ArgMatcher::is_boolean()), - ], - SedonaType::Wkb(Edges::Planar, None), - ); - - if !matcher.matches(arg_types) { - return Ok(None); - } - - let mut indexes = TransformArgIndexes::new(); - define_arg_indexes(arg_types, &mut indexes); - - let scalar_arg_opt = if let Some(second_crs_index) = indexes.second_crs { - scalar_args.get(second_crs_index).unwrap() - } else { - scalar_args.get(indexes.first_crs).unwrap() - }; - - let crs_str_opt = if let Some(scalar_crs) = scalar_arg_opt { - to_crs_str(scalar_crs) - } else { - None - }; - - // If there is no CRS argument, we cannot determine the return type. - match crs_str_opt { - Some(to_crs) => { - let crs = deserialize_crs(&to_crs)?; - Ok(Some(SedonaType::Wkb(Edges::Planar, crs))) - } - _ => Ok(Some(SedonaType::Wkb(Edges::Planar, None))), - } - } - - fn invoke_batch( - &self, - arg_types: &[SedonaType], - args: &[ColumnarValue], - ) -> Result { - let executor = WkbExecutor::new(arg_types, args); - let mut builder = BinaryBuilder::with_capacity( - executor.num_iterations(), - WKB_MIN_PROBABLE_BYTES * executor.num_iterations(), - ); - - let mut indexes = TransformArgIndexes::new(); - define_arg_indexes(arg_types, &mut indexes); - - let first_crs = get_crs_str(args, indexes.first_crs).ok_or_else(|| { - DataFusionError::Execution( - "First CRS argument must be a string or numeric scalar".to_string(), - ) - })?; - - let lenient = indexes - .lenient - .is_some_and(|i| get_scalar_bool(args, i).unwrap_or(false)); - - let second_crs = if let Some(second_crs_index) = indexes.second_crs { - get_crs_str(args, second_crs_index) - } else { - None - }; - - with_global_proj_engine(|engine| { - let crs_from_geo = parse_source_crs(&arg_types[indexes.wkb])?; - - let transform = match &second_crs { - Some(to_crs) => get_transform_crs_to_crs(engine, &first_crs, to_crs)?, - None => get_transform_to_crs(engine, crs_from_geo, &first_crs, lenient)?, - }; - - executor.execute_wkb_void(|maybe_wkb| { - match maybe_wkb { - Some(wkb) => invoke_scalar(&wkb, transform.as_ref(), &mut builder)?, - None => builder.append_null(), - } - - Ok(()) - })?; - - Ok(()) - })?; - - executor.finish(Arc::new(builder.finish())) - } -} - -fn get_transform_to_crs( - engine: &dyn CrsEngine, - source_crs_opt: Option, - to_crs: &str, - lenient: bool, -) -> Result, DataFusionError> { - let from_crs = match source_crs_opt { - Some(crs) => crs, - None if lenient => "EPSG:4326".to_string(), - None => { - return Err(DataFusionError::Execution( - "Source CRS is required when transforming to a CRS".to_string(), - )) - } - }; - get_transform_crs_to_crs(engine, &from_crs, to_crs) -} - -fn get_transform_crs_to_crs( - engine: &dyn CrsEngine, - from_crs: &str, - to_crs: &str, -) -> Result, DataFusionError> { - engine - .get_transform_crs_to_crs(from_crs, to_crs, None, "") - .map_err(|err| DataFusionError::Execution(format!("Transform error: {err}"))) -} - -fn invoke_scalar(wkb: &Wkb, trans: &dyn CrsTransform, builder: &mut BinaryBuilder) -> Result<()> { - let geo_geom = wkb.to_geometry(); - transform(&geo_geom, trans, builder) - .map_err(|err| DataFusionError::Execution(format!("Transform error: {err}")))?; - builder.append_value([]); - Ok(()) -} - -fn parse_source_crs(source_type: &SedonaType) -> Result> { - match source_type { - SedonaType::Wkb(_, Some(crs)) | SedonaType::WkbView(_, Some(crs)) => { - Ok(Some(crs.to_crs_string())) - } - _ => Ok(None), - } -} - -fn to_crs_str(scalar_arg: &ScalarValue) -> Option { - if let Ok(ScalarValue::Utf8(Some(crs))) = scalar_arg.cast_to(&DataType::Utf8) { - if crs.chars().all(|c| c.is_ascii_digit()) { - return Some(format!("EPSG:{crs}")); - } else { - return Some(crs); - } - } - - None -} - -fn get_crs_str(args: &[ColumnarValue], index: usize) -> Option { - if let ColumnarValue::Scalar(scalar_crs) = &args[index] { - return to_crs_str(scalar_crs); - } - None -} - -fn get_scalar_bool(args: &[ColumnarValue], index: usize) -> Option { - if let Some(ColumnarValue::Scalar(ScalarValue::Boolean(opt_bool))) = args.get(index) { - *opt_bool - } else { - None - } -} - #[cfg(test)] mod tests { use super::*; + use arrow_array::create_array; use arrow_array::ArrayRef; - use arrow_schema::{DataType, Field}; - use datafusion_common::config::ConfigOptions; - use datafusion_expr::{ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl}; + use arrow_schema::DataType; use rstest::rstest; use sedona_expr::scalar_udf::SedonaScalarUDF; use sedona_schema::crs::lnglat; use sedona_schema::crs::Crs; use sedona_schema::datatypes::WKB_GEOMETRY; - use sedona_testing::compare::assert_value_equal; - use sedona_testing::{create::create_array, create::create_array_value}; + use sedona_testing::compare::assert_array_equal; + use sedona_testing::create::create_array; + use sedona_testing::create::create_array_item_crs; + use sedona_testing::create::create_scalar; + use sedona_testing::testers::ScalarUdfTester; const NAD83ZONE6PROJ: &str = "EPSG:2230"; const WGS84: &str = "EPSG:4326"; - #[rstest] - fn invalid_arg_checks() { - let udf: SedonaScalarUDF = SedonaScalarUDF::from_impl("st_transform", st_transform_impl()); - - // No args - let result = udf.return_field_from_args(ReturnFieldArgs { - arg_fields: &[], - scalar_arguments: &[], - }); - assert!( - result.is_err() - && result - .unwrap_err() - .to_string() - .contains("No kernel matching arguments") + #[test] + fn test_invoke_with_string() { + let udf = SedonaScalarUDF::from_impl("st_transform", st_transform_impl()); + let geometry_input = SedonaType::Wkb(Edges::Planar, lnglat()); + let tester = ScalarUdfTester::new( + udf.into(), + vec![geometry_input.clone(), SedonaType::Arrow(DataType::Utf8)], ); - // Too many args - let arg_types = [ - WKB_GEOMETRY, - SedonaType::Arrow(DataType::Utf8), - SedonaType::Arrow(DataType::Utf8), - SedonaType::Arrow(DataType::Boolean), - SedonaType::Arrow(DataType::Int32), - ]; - let arg_fields: Vec> = arg_types - .iter() - .map(|arg_type| Arc::new(arg_type.to_storage_field("", true).unwrap())) - .collect(); - let result = udf.return_field_from_args(ReturnFieldArgs { - arg_fields: &arg_fields, - scalar_arguments: &[None, None, None, None, None], - }); - assert!( - result.is_err() - && result - .unwrap_err() - .to_string() - .contains("No kernel matching arguments") - ); + // Return type with scalar to argument (returns type-level CRS) + let expected_return_type = SedonaType::Wkb(Edges::Planar, get_crs(NAD83ZONE6PROJ)); + let return_type = tester + .return_type_with_scalar_scalar(Option::<&str>::None, Some(NAD83ZONE6PROJ)) + .unwrap(); + assert_eq!(return_type, expected_return_type); - // First arg not geometry - let arg_types = [ - SedonaType::Arrow(DataType::Utf8), - SedonaType::Arrow(DataType::Utf8), - ]; - let arg_fields: Vec> = arg_types - .iter() - .map(|arg_type| Arc::new(arg_type.to_storage_field("", true).unwrap())) - .collect(); - let result = udf.return_field_from_args(ReturnFieldArgs { - arg_fields: &arg_fields, - scalar_arguments: &[None, None], - }); - assert!( - result.is_err() - && result - .unwrap_err() - .to_string() - .contains("No kernel matching arguments") + // Return type with array to argument (returns item CRS) + let return_type = tester.return_type().unwrap(); + assert_eq!(return_type, WKB_GEOMETRY_ITEM_CRS.clone()); + + // Invoke with scalar to argument (returns type-level CRS) + let expected_array = create_array( + &[None, Some("POINT (-21508577.363421552 34067918.06097863)")], + &expected_return_type, ); + let wkb = create_array(&[None, Some("POINT (79.3871 43.6426)")], &geometry_input); + let result = tester.invoke_array_scalar(wkb, NAD83ZONE6PROJ).unwrap(); + assert_array_equal(&result, &expected_array); - // Second arg not string or numeric - let arg_types = [WKB_GEOMETRY, SedonaType::Arrow(DataType::Boolean)]; - let arg_fields: Vec> = arg_types - .iter() - .map(|arg_type| Arc::new(arg_type.to_storage_field("", true).unwrap())) - .collect(); - let result = udf.return_field_from_args(ReturnFieldArgs { - arg_fields: &arg_fields, - scalar_arguments: &[None, None], - }); - assert!( - result.is_err() - && result - .unwrap_err() - .to_string() - .contains("No kernel matching arguments") + // Invoke with array to argument (returns item CRS) + let expected_array = create_array_item_crs( + &[None, Some("POINT (-21508577.363421552 34067918.06097863)")], + [None, Some(NAD83ZONE6PROJ)], + &WKB_GEOMETRY, ); + let wkb = create_array(&[None, Some("POINT (79.3871 43.6426)")], &geometry_input); + let crs = create_array!(Utf8, [None, Some(NAD83ZONE6PROJ)]) as ArrayRef; + let result = tester.invoke_array_array(wkb, crs).unwrap(); + assert_array_equal(&result, &expected_array); } - #[rstest] - fn test_invoke_batch_with_geo_crs() { - // From-CRS pulled from sedona type - let arg_types = [ - SedonaType::Wkb(Edges::Planar, lnglat()), - SedonaType::Arrow(DataType::Utf8), - ]; + #[test] + fn test_invoke_with_srid() { + let udf = SedonaScalarUDF::from_impl("st_transform", st_transform_impl()); + let geometry_input = SedonaType::Wkb(Edges::Planar, lnglat()); + let tester = ScalarUdfTester::new( + udf.into(), + vec![geometry_input.clone(), SedonaType::Arrow(DataType::UInt32)], + ); - let wkb = create_array(&[None, Some("POINT (79.3871 43.6426)")], &arg_types[0]); + // Return type with scalar to argument (returns type-level CRS) + let expected_return_type = SedonaType::Wkb(Edges::Planar, get_crs(NAD83ZONE6PROJ)); + let return_type = tester + .return_type_with_scalar_scalar(Option::<&str>::None, Some(2230)) + .unwrap(); + assert_eq!(return_type, expected_return_type); - let scalar_args = vec![ScalarValue::Utf8(Some(NAD83ZONE6PROJ.to_string()))]; + // Return type with array to argument (returns item CRS) + let return_type = tester.return_type().unwrap(); + assert_eq!(return_type, WKB_GEOMETRY_ITEM_CRS.clone()); - let expected = create_array_value( + // Invoke with scalar to argument (returns type-level CRS) + let expected_array = create_array( &[None, Some("POINT (-21508577.363421552 34067918.06097863)")], - &SedonaType::Wkb(Edges::Planar, get_crs(NAD83ZONE6PROJ)), + &expected_return_type, ); + let wkb = create_array(&[None, Some("POINT (79.3871 43.6426)")], &geometry_input); + let result = tester.invoke_array_scalar(wkb, 2230).unwrap(); + assert_array_equal(&result, &expected_array); - let (result_type, result_col) = - invoke_udf_test(wkb, scalar_args, arg_types.to_vec()).unwrap(); - assert_value_equal(&result_col, &expected); - assert_eq!( - result_type, - SedonaType::Wkb(Edges::Planar, get_crs(NAD83ZONE6PROJ)) + // Invoke with array to argument (returns item CRS) + let expected_array = create_array_item_crs( + &[None, Some("POINT (-21508577.363421552 34067918.06097863)")], + [None, Some(NAD83ZONE6PROJ)], + &WKB_GEOMETRY, ); + let wkb = create_array(&[None, Some("POINT (79.3871 43.6426)")], &geometry_input); + let crs = create_array!(Int32, [None, Some(2230)]) as ArrayRef; + let result = tester.invoke_array_array(wkb, crs).unwrap(); + assert_array_equal(&result, &expected_array); } - #[rstest] - fn test_invoke_with_srids() { - // Use an integer SRID for the to CRS - let arg_types = [ - SedonaType::Wkb(Edges::Planar, lnglat()), - SedonaType::Arrow(DataType::UInt32), - ]; - - let wkb = create_array(&[None, Some("POINT (79.3871 43.6426)")], &arg_types[0]); - - let scalar_args = vec![ScalarValue::UInt32(Some(2230))]; + #[test] + fn test_invoke_with_item_crs() { + let udf = SedonaScalarUDF::from_impl("st_transform", st_transform_impl()); + let geometry_input = WKB_GEOMETRY_ITEM_CRS.clone(); + let tester = ScalarUdfTester::new( + udf.into(), + vec![geometry_input.clone(), SedonaType::Arrow(DataType::Utf8)], + ); - let expected = create_array_value( + // Return type with scalar to argument (returns type-level CRS) + // This is the same as for normal input + let expected_return_type = SedonaType::Wkb(Edges::Planar, get_crs(NAD83ZONE6PROJ)); + let return_type = tester + .return_type_with_scalar_scalar(Option::<&str>::None, Some(NAD83ZONE6PROJ)) + .unwrap(); + assert_eq!(return_type, expected_return_type); + + // Return type with array to argument (returns item CRS) + // This is the same as for normal input + let return_type = tester.return_type().unwrap(); + assert_eq!(return_type, WKB_GEOMETRY_ITEM_CRS.clone()); + + // Invoke with scalar to argument (returns type-level CRS) + let expected_array = create_array( &[None, Some("POINT (-21508577.363421552 34067918.06097863)")], - &SedonaType::Wkb(Edges::Planar, get_crs(NAD83ZONE6PROJ)), + &expected_return_type, ); - - let (result_type, result_col) = - invoke_udf_test(wkb, scalar_args, arg_types.to_vec()).unwrap(); - assert_value_equal(&result_col, &expected); - assert_eq!( - result_type, - SedonaType::Wkb(Edges::Planar, get_crs(NAD83ZONE6PROJ)) + let array_in = create_array_item_crs( + &[None, Some("POINT (79.3871 43.6426)")], + [None, Some("EPSG:4326")], + &WKB_GEOMETRY, ); - } + let result = tester + .invoke_array_scalar(array_in, NAD83ZONE6PROJ) + .unwrap(); + assert_array_equal(&result, &expected_array); - #[rstest] - fn test_invoke_batch_with_lenient() { - let arg_types = [ - WKB_GEOMETRY, - SedonaType::Arrow(DataType::Utf8), - SedonaType::Arrow(DataType::Boolean), - ]; - - let wkb = create_array(&[None, Some("POINT (79.3871 43.6426)")], &WKB_GEOMETRY); - let scalar_args = vec![ - ScalarValue::Utf8(Some(NAD83ZONE6PROJ.to_string())), - ScalarValue::Boolean(Some(true)), - ]; - - let expected = create_array_value( + // Invoke with array to argument (returns item CRS) + let expected_array = create_array_item_crs( &[None, Some("POINT (-21508577.363421552 34067918.06097863)")], - &SedonaType::Wkb(Edges::Planar, Some(get_crs(NAD83ZONE6PROJ).unwrap())), + [None, Some(NAD83ZONE6PROJ)], + &WKB_GEOMETRY, ); - - let (result_type, result_col) = - invoke_udf_test(wkb, scalar_args, arg_types.to_vec()).unwrap(); - assert_value_equal(&result_col, &expected); - assert_eq!( - result_type, - SedonaType::Wkb(Edges::Planar, Some(get_crs(NAD83ZONE6PROJ).unwrap())) + let array_in = create_array_item_crs( + &[None, Some("POINT (79.3871 43.6426)")], + [None, Some("EPSG:4326")], + &WKB_GEOMETRY, ); + let crs = create_array!(Utf8, [None, Some(NAD83ZONE6PROJ)]) as ArrayRef; + let result = tester.invoke_array_array(array_in, crs).unwrap(); + assert_array_equal(&result, &expected_array); } #[rstest] - fn test_invoke_batch_one_crs_no_lenient() { - let arg_types = [WKB_GEOMETRY, SedonaType::Arrow(DataType::Utf8)]; + fn test_invoke_source_arg() { + let udf = SedonaScalarUDF::from_impl("st_transform", st_transform_impl()); + let geometry_input = WKB_GEOMETRY; + let tester = ScalarUdfTester::new( + udf.into(), + vec![ + geometry_input.clone(), + SedonaType::Arrow(DataType::Utf8), + SedonaType::Arrow(DataType::Utf8), + ], + ); - let wkb = create_array(&[None, Some("POINT (79.3871 43.6426)")], &WKB_GEOMETRY); - let scalar_args = vec![ScalarValue::Utf8(Some(NAD83ZONE6PROJ.to_string()))]; + // Return type with scalar to argument (returns type-level CRS) + // This is the same as for normal input + let expected_return_type = SedonaType::Wkb(Edges::Planar, get_crs(NAD83ZONE6PROJ)); + let return_type = tester + .return_type_with_scalar_scalar_scalar( + Option::<&str>::None, + Option::<&str>::None, + Some(NAD83ZONE6PROJ), + ) + .unwrap(); + assert_eq!(return_type, expected_return_type); - let err = invoke_udf_test(wkb, scalar_args, arg_types.to_vec()); - assert!( - matches!(err, Err(DataFusionError::Execution(_))), - "Expected an Execution error" - ); - } + // Return type with array to argument (returns item CRS) + // This is the same as for normal input + let return_type = tester.return_type().unwrap(); + assert_eq!(return_type, WKB_GEOMETRY_ITEM_CRS.clone()); - #[rstest] - fn test_invoke_batch_with_source_arg() { - let arg_types = [ - WKB_GEOMETRY, - SedonaType::Arrow(DataType::Utf8), - SedonaType::Arrow(DataType::Utf8), - ]; - - let wkb = create_array(&[None, Some("POINT (79.3871 43.6426)")], &WKB_GEOMETRY); - - let scalar_args = vec![ - ScalarValue::Utf8(Some(WGS84.to_string())), - ScalarValue::Utf8(Some(NAD83ZONE6PROJ.to_string())), - ]; - - // Note: would be nice to have an epsilon of tolerance when validating - let expected = create_array_value( + // Invoke with scalar to argument (returns type-level CRS) + let expected_array = create_array( &[None, Some("POINT (-21508577.363421552 34067918.06097863)")], - &SedonaType::Wkb(Edges::Planar, Some(get_crs(NAD83ZONE6PROJ).unwrap())), + &expected_return_type, ); + let array_in = create_array(&[None, Some("POINT (79.3871 43.6426)")], &geometry_input); + let crs_from = create_array!(Utf8, [None, Some(WGS84)]) as ArrayRef; + let result = tester + .invoke_array_array_scalar(array_in, crs_from, NAD83ZONE6PROJ) + .unwrap(); + assert_array_equal(&result, &expected_array); + + // Invoke with array to argument (returns item CRS) + let expected_array = create_array_item_crs( + &[None, Some("POINT (-21508577.363421552 34067918.06097863)")], + [None, Some(NAD83ZONE6PROJ)], + &WKB_GEOMETRY, + ); + let array_in = create_array(&[None, Some("POINT (79.3871 43.6426)")], &WKB_GEOMETRY); + let crs_from = create_array!(Utf8, [None, Some(WGS84)]) as ArrayRef; + let crs_to = create_array!(Utf8, [None, Some(NAD83ZONE6PROJ)]) as ArrayRef; + let result = tester + .invoke_arrays(vec![array_in, crs_from, crs_to]) + .unwrap(); + assert_array_equal(&result, &expected_array); + } - let (result_type, result_col) = - invoke_udf_test(wkb.clone(), scalar_args, arg_types.to_vec()).unwrap(); - assert_value_equal(&result_col, &expected); - assert_eq!( - result_type, - SedonaType::Wkb(Edges::Planar, Some(get_crs(NAD83ZONE6PROJ).unwrap())) + #[test] + fn test_invoke_null_crs_to() { + let udf = SedonaScalarUDF::from_impl("st_transform", st_transform_impl()); + let tester = ScalarUdfTester::new( + udf.clone().into(), + vec![WKB_GEOMETRY, SedonaType::Arrow(DataType::Utf8)], ); - // Test with integer SRIDs - let arg_types = [ - WKB_GEOMETRY, - SedonaType::Arrow(DataType::Int32), - SedonaType::Arrow(DataType::Int32), - ]; - - let scalar_args = vec![ - ScalarValue::Int32(Some(4326)), - ScalarValue::Int32(Some(2230)), - ]; - - let (result_type, result_col) = - invoke_udf_test(wkb, scalar_args, arg_types.to_vec()).unwrap(); - assert_value_equal(&result_col, &expected); - assert_eq!( - result_type, - SedonaType::Wkb(Edges::Planar, Some(get_crs(NAD83ZONE6PROJ).unwrap())) + // A null scalar CRS should generate WKB_GEOMETRY output with a type + // level CRS that is unset; however, all the output will be null. + let result = tester + .invoke_scalar_scalar("POINT (0 1)", ScalarValue::Null) + .unwrap(); + assert_eq!(result, create_scalar(None, &WKB_GEOMETRY)); + + let expected_array = create_array(&[None, None, None], &WKB_GEOMETRY); + let array_in = create_array( + &[ + Some("POINT (0 1)"), + Some("POINT (1 2)"), + Some("POINT (2 3)"), + ], + &WKB_GEOMETRY, + ); + let result = tester + .invoke_array_scalar(array_in, ScalarValue::Null) + .unwrap(); + assert_array_equal(&result, &expected_array); + + // This currently has a side effect of working even though there is not + // valid transform from lnglat() to an unset CRS (because no transformations + // will ever take place). + let geometry_input = SedonaType::Wkb(Edges::Planar, lnglat()); + let tester = ScalarUdfTester::new( + udf.clone().into(), + vec![geometry_input, SedonaType::Arrow(DataType::Utf8)], ); + let result = tester + .invoke_scalar_scalar("POINT (0 1)", ScalarValue::Null) + .unwrap(); + assert_eq!(result, create_scalar(None, &WKB_GEOMETRY)); } - fn get_crs(auth_code: &str) -> Crs { - deserialize_crs(auth_code).unwrap() + #[test] + fn test_invoke_unset_crs_to() { + let udf = SedonaScalarUDF::from_impl("st_transform", st_transform_impl()); + let tester = ScalarUdfTester::new( + udf.clone().into(), + vec![WKB_GEOMETRY, SedonaType::Arrow(DataType::Int32)], + ); + + // A unset scalar CRS should generate WKB_GEOMETRY output with a type + // level CRS that is unset. This transformation is only valid if the input + // also has unset CRSes (and the result is a noop). + let result = tester.invoke_scalar_scalar("POINT (0 1)", 0).unwrap(); + assert_eq!(result, create_scalar(Some("POINT (0 1)"), &WKB_GEOMETRY)); + + let array_in = create_array( + &[ + Some("POINT (0 1)"), + Some("POINT (1 2)"), + Some("POINT (2 3)"), + ], + &WKB_GEOMETRY, + ); + let result = tester.invoke_array_scalar(array_in.clone(), 0).unwrap(); + assert_array_equal(&result, &array_in); + + // This should fail, because there is no valid transform between lnglat() + // and an unset CRS. + let geometry_input = SedonaType::Wkb(Edges::Planar, lnglat()); + let tester = ScalarUdfTester::new( + udf.clone().into(), + vec![geometry_input, SedonaType::Arrow(DataType::Int32)], + ); + let err = tester.invoke_scalar_scalar("POINT (0 1)", 0).unwrap_err(); + assert_eq!( + err.message(), + "Can't transform to or from an unset CRS. Do you need to call ST_SetSRID on the input?" + ); } - fn invoke_udf_test( - wkb: ArrayRef, - scalar_args: Vec, - arg_types: Vec, - ) -> Result<(SedonaType, ColumnarValue)> { + #[test] + fn invalid_arg_types() { let udf = SedonaScalarUDF::from_impl("st_transform", st_transform_impl()); - let arg_fields: Vec> = arg_types - .into_iter() - .map(|arg_type| Arc::new(arg_type.to_storage_field("", true).unwrap())) - .collect(); - let row_count = wkb.len(); - - let mut scalar_args_fields: Vec> = vec![None]; - let mut arg_vals: Vec = vec![ColumnarValue::Array(Arc::new(wkb))]; + // No args + let tester = ScalarUdfTester::new(udf.clone().into(), vec![]); + let err = tester.return_type().unwrap_err(); + assert_eq!( + err.message(), + "st_transform([]): No kernel matching arguments" + ); - for scalar_arg in &scalar_args { - scalar_args_fields.push(Some(scalar_arg)); - arg_vals.push(scalar_arg.clone().into()); - } + // Too many args + let tester = ScalarUdfTester::new( + udf.clone().into(), + vec![ + SedonaType::Arrow(DataType::Utf8), + SedonaType::Arrow(DataType::Utf8), + SedonaType::Arrow(DataType::Utf8), + SedonaType::Arrow(DataType::Utf8), + ], + ); + let err = tester.return_type().unwrap_err(); + assert_eq!( + err.message(), + "st_transform([Arrow(Utf8), Arrow(Utf8), Arrow(Utf8), Arrow(Utf8)]): No kernel matching arguments" + ); - let return_field_args = ReturnFieldArgs { - arg_fields: &arg_fields, - scalar_arguments: &scalar_args_fields, - }; + // First arg not geometry + let tester = ScalarUdfTester::new( + udf.clone().into(), + vec![ + SedonaType::Arrow(DataType::Utf8), + SedonaType::Arrow(DataType::Utf8), + ], + ); + let err = tester.return_type().unwrap_err(); + assert_eq!( + err.message(), + "st_transform([Arrow(Utf8), Arrow(Utf8)]): No kernel matching arguments" + ); - let return_field = udf.return_field_from_args(return_field_args)?; - let return_type = SedonaType::from_storage_field(&return_field)?; + // Second arg not string or numeric + let tester = ScalarUdfTester::new( + udf.clone().into(), + vec![WKB_GEOMETRY, SedonaType::Arrow(DataType::Boolean)], + ); + let err = tester.return_type().unwrap_err(); + assert_eq!( + err.message(), + "st_transform([Wkb(Planar, None), Arrow(Boolean)]): No kernel matching arguments" + ); - let args = ScalarFunctionArgs { - args: arg_vals, - arg_fields: arg_fields.to_vec(), - number_rows: row_count, - return_field, - config_options: Arc::new(ConfigOptions::default()), - }; + // third arg not string or numeric + let tester = ScalarUdfTester::new( + udf.clone().into(), + vec![ + WKB_GEOMETRY, + SedonaType::Arrow(DataType::Utf8), + SedonaType::Arrow(DataType::Boolean), + ], + ); + let err = tester.return_type().unwrap_err(); + assert_eq!( + err.message(), + "st_transform([Wkb(Planar, None), Arrow(Utf8), Arrow(Boolean)]): No kernel matching arguments" + ); + } - let value = udf.invoke_with_args(args)?; - Ok((return_type, value)) + fn get_crs(auth_code: &str) -> Crs { + deserialize_crs(auth_code).unwrap() } } diff --git a/rust/sedona-functions/src/executor.rs b/rust/sedona-functions/src/executor.rs index 0686070fd..f1d679d28 100644 --- a/rust/sedona-functions/src/executor.rs +++ b/rust/sedona-functions/src/executor.rs @@ -18,7 +18,7 @@ use std::iter::zip; use arrow_array::ArrayRef; use arrow_schema::DataType; -use datafusion_common::cast::{as_binary_array, as_binary_view_array}; +use datafusion_common::cast::{as_binary_array, as_binary_view_array, as_struct_array}; use datafusion_common::error::Result; use datafusion_common::{DataFusionError, ScalarValue}; use datafusion_expr::ColumnarValue; @@ -357,6 +357,15 @@ impl IterGeo for ArrayRef { } SedonaType::Wkb(_, _) => iter_wkb_binary(as_binary_array(self)?, func), SedonaType::WkbView(_, _) => iter_wkb_binary(as_binary_view_array(self)?, func), + SedonaType::Arrow(DataType::Struct(fields)) + if fields.len() == 2 && fields[0].name() == "item" && fields[1].name() == "crs" => + { + let struct_array = as_struct_array(self)?; + let item_type = SedonaType::from_storage_field(&fields[0])?; + struct_array + .column(0) + .iter_as_wkb_bytes(&item_type, num_iterations, func) + } _ => { // We could cast here as a fallback, iterate and cast per-element, or // implement iter_as_something_else()/supports_iter_xxx() when more geo array types diff --git a/rust/sedona-testing/src/testers.rs b/rust/sedona-testing/src/testers.rs index 3e939b32c..54b947a36 100644 --- a/rust/sedona-testing/src/testers.rs +++ b/rust/sedona-testing/src/testers.rs @@ -570,8 +570,18 @@ impl ScalarUdfTester { } pub fn invoke(&self, args: Vec) -> Result { - self.invoke_with_return_type(args, None) + let scalar_args = args + .iter() + .map(|arg| match arg { + ColumnarValue::Array(_) => None, + ColumnarValue::Scalar(scalar_value) => Some(scalar_value.clone()), + }) + .collect::>(); + + let return_type = self.return_type_with_scalars_inner(&scalar_args)?; + self.invoke_with_return_type(args, Some(return_type)) } + pub fn invoke_with_return_type( &self, args: Vec,