diff --git a/python/sedonadb/tests/test_sjoin.py b/python/sedonadb/tests/test_sjoin.py index d0fe86e53..2a3728ff1 100644 --- a/python/sedonadb/tests/test_sjoin.py +++ b/python/sedonadb/tests/test_sjoin.py @@ -76,3 +76,65 @@ def test_spatial_join(join_type, on): sedonadb_results = eng_sedonadb.execute_and_collect(sql).to_pandas() assert len(sedonadb_results) > 0 eng_postgis.assert_query_result(sql, sedonadb_results) + + +@pytest.mark.parametrize( + "join_type", ["INNER JOIN", "LEFT OUTER JOIN", "RIGHT OUTER JOIN"] +) +@pytest.mark.parametrize( + "on", + [ + "ST_Intersects(sjoin_geog1.geog, sjoin_geog2.geog)", + "ST_Distance(sjoin_geog1.geog, sjoin_geog2.geog) < 100000", + ], +) +def test_spatial_join_geography(join_type, on): + eng_sedonadb = SedonaDB.create_or_skip() + eng_postgis = PostGIS.create_or_skip() + + # Select two sets of bounding boxes that cross the antimeridian, + # which would be disjoint on a Euclidean plane. A geography join will produce non-empty results, + # whereas a geometry join would not. + west_most_bound = [-190, -10, -170, 10] + east_most_bound = [170, -10, 190, 10] + options = json.dumps( + { + "geom_type": "Point", + "num_parts_range": [2, 10], + "vertices_per_linestring_range": [2, 10], + "bounds": west_most_bound, + "size_range": [0.1, 5], + "seed": 42, + } + ) + df_point = eng_sedonadb.execute_and_collect( + f"SELECT id, ST_SetSRID(ST_GeogFromWKB(ST_AsBinary(geometry)), 4326) geog, dist FROM sd_random_geometry('{options}') LIMIT 100" + ) + options = json.dumps( + { + "geom_type": "Polygon", + "polygon_hole_rate": 0.5, + "num_parts_range": [2, 10], + "vertices_per_linestring_range": [2, 10], + "bounds": east_most_bound, + "size_range": [0.1, 5], + "seed": 43, + } + ) + df_polygon = eng_sedonadb.execute_and_collect( + f"SELECT id, ST_SetSRID(ST_GeogFromWKB(ST_AsBinary(geometry)), 4326) geog, dist FROM sd_random_geometry('{options}') LIMIT 100" + ) + eng_sedonadb.create_table_arrow("sjoin_geog1", df_point) + eng_sedonadb.create_table_arrow("sjoin_geog2", df_polygon) + eng_postgis.create_table_arrow("sjoin_geog1", df_point) + eng_postgis.create_table_arrow("sjoin_geog2", df_polygon) + + sql = f""" + SELECT sjoin_geog1.id id0, sjoin_geog2.id id1 + FROM sjoin_geog1 {join_type} sjoin_geog2 + ON {on} + ORDER BY id0, id1 + """ + + sedonadb_results = eng_sedonadb.execute_and_collect(sql).to_pandas() + eng_postgis.assert_query_result(sql, sedonadb_results) diff --git a/rust/sedona-expr/src/spatial_filter.rs b/rust/sedona-expr/src/spatial_filter.rs index 64feda29b..da4baf1d3 100644 --- a/rust/sedona-expr/src/spatial_filter.rs +++ b/rust/sedona-expr/src/spatial_filter.rs @@ -26,7 +26,7 @@ use datafusion_physical_expr::{ use geo_traits::Dimensions; use sedona_common::sedona_internal_err; use sedona_geometry::{bounding_box::BoundingBox, bounds::wkb_bounds_xy, interval::IntervalTrait}; -use sedona_schema::datatypes::SedonaType; +use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher}; use crate::{ statistics::GeoStatistics, @@ -185,6 +185,9 @@ impl SpatialFilter { match (&args[0], &args[1]) { (ArgRef::Col(column), ArgRef::Lit(literal)) | (ArgRef::Lit(literal), ArgRef::Col(column)) => { + if !is_prunable_geospatial_literal(literal) { + return Ok(Some(Self::Unknown)); + } match literal_bounds(literal) { Ok(literal_bounds) => { Ok(Some(Self::Intersects(column.clone(), literal_bounds))) @@ -204,6 +207,9 @@ impl SpatialFilter { match (&args[0], &args[1]) { (ArgRef::Col(column), ArgRef::Lit(literal)) => { // column within/covered_by literal -> Intersects filter + if !is_prunable_geospatial_literal(literal) { + return Ok(Some(Self::Unknown)); + } match literal_bounds(literal) { Ok(literal_bounds) => { Ok(Some(Self::Intersects(column.clone(), literal_bounds))) @@ -213,6 +219,9 @@ impl SpatialFilter { } (ArgRef::Lit(literal), ArgRef::Col(column)) => { // literal within/covered_by column -> Covers filter + if !is_prunable_geospatial_literal(literal) { + return Ok(Some(Self::Unknown)); + } match literal_bounds(literal) { Ok(literal_bounds) => { Ok(Some(Self::Covers(column.clone(), literal_bounds))) @@ -233,6 +242,9 @@ impl SpatialFilter { (ArgRef::Col(column), ArgRef::Lit(literal)) => { // column contains/covers literal -> Covers filter // (column's bbox must fully cover literal's bbox) + if !is_prunable_geospatial_literal(literal) { + return Ok(Some(Self::Unknown)); + } match literal_bounds(literal) { Ok(literal_bounds) => { Ok(Some(Self::Covers(column.clone(), literal_bounds))) @@ -243,6 +255,9 @@ impl SpatialFilter { (ArgRef::Lit(literal), ArgRef::Col(column)) => { // literal contains/covers column -> Intersects filter // (if literal contains column, they must at least intersect) + if !is_prunable_geospatial_literal(literal) { + return Ok(Some(Self::Unknown)); + } match literal_bounds(literal) { Ok(literal_bounds) => { Ok(Some(Self::Intersects(column.clone(), literal_bounds))) @@ -284,6 +299,9 @@ impl SpatialFilter { match (&args[0], &args[1], &args[2]) { (ArgRef::Col(column), ArgRef::Lit(literal), ArgRef::Lit(distance)) | (ArgRef::Lit(literal), ArgRef::Col(column), ArgRef::Lit(distance)) => { + if !is_prunable_geospatial_literal(literal) { + return Ok(Some(Self::Unknown)); + } match ( literal_bounds(literal), distance.value().cast_to(&DataType::Float64)?, @@ -314,6 +332,19 @@ enum ArgRef<'a> { Other, } +/// Our current spatial data pruning implementation does not correctly handle geography data. +/// We therefore only consider geometry data type for pruning. +fn is_prunable_geospatial_literal(literal: &Literal) -> bool { + let Ok(literal_field) = literal.return_field(&Schema::empty()) else { + return false; + }; + let Ok(sedona_type) = SedonaType::from_storage_field(&literal_field) else { + return false; + }; + let matcher = ArgMatcher::is_geometry(); + matcher.match_type(&sedona_type) +} + fn literal_bounds(literal: &Literal) -> Result { let literal_field = literal.return_field(&Schema::empty())?; let sedona_type = SedonaType::from_storage_field(&literal_field)?; @@ -348,12 +379,11 @@ fn parse_args(args: &[Arc]) -> Vec> { #[cfg(test)] mod test { - use arrow_schema::{DataType, Field}; use datafusion_expr::{ScalarUDF, Signature, SimpleScalarUDF, Volatility}; use rstest::rstest; use sedona_geometry::{bounding_box::BoundingBox, interval::Interval}; - use sedona_schema::datatypes::WKB_GEOMETRY; + use sedona_schema::datatypes::{WKB_GEOGRAPHY, WKB_GEOMETRY}; use sedona_testing::create::create_scalar; use super::*; @@ -806,6 +836,111 @@ mod test { )); } + #[rstest] + fn range_predicate_involving_geography_should_be_transformed_to_unknown( + #[values( + "st_intersects", + "st_equals", + "st_touches", + "st_contains", + "st_covers", + "st_within", + "st_covered_by", + "st_coveredby" + )] + func_name: &str, + ) { + let column: Arc = Arc::new(Column::new("geometry", 0)); + let storage_field = WKB_GEOGRAPHY.to_storage_field("", true).unwrap(); + let literal: Arc = Arc::new(Literal::new_with_metadata( + create_scalar(Some("POLYGON ((0 0, 2 0, 2 2, 0 2, 0 0))"), &WKB_GEOGRAPHY), + Some(storage_field.metadata().into()), + )); + + let func = create_dummy_spatial_function(func_name, 2); + let expr: Arc = Arc::new(ScalarFunctionExpr::new( + func_name, + Arc::new(func.clone()), + vec![column.clone(), literal.clone()], + Arc::new(Field::new("", DataType::Boolean, true)), + )); + let predicate = SpatialFilter::try_from_expr(&expr).unwrap(); + assert!( + matches!(predicate, SpatialFilter::Unknown), + "Function {func_name} involving geography should produce Unknown filter" + ); + } + + #[test] + fn distance_predicate_involving_geography_should_be_transformed_to_unknown() { + let column: Arc = Arc::new(Column::new("geometry", 0)); + let storage_field = WKB_GEOGRAPHY.to_storage_field("", true).unwrap(); + let literal: Arc = Arc::new(Literal::new_with_metadata( + create_scalar(Some("POINT (1 2)"), &WKB_GEOGRAPHY), + Some(storage_field.metadata().into()), + )); + let distance_literal: Arc = + Arc::new(Literal::new(ScalarValue::Float64(Some(100.0)))); + + // Test ST_DWithin function + let st_dwithin = create_dummy_spatial_function("st_dwithin", 3); + let dwithin_expr: Arc = Arc::new(ScalarFunctionExpr::new( + "st_dwithin", + Arc::new(st_dwithin.clone()), + vec![column.clone(), literal.clone(), distance_literal.clone()], + Arc::new(Field::new("", DataType::Boolean, true)), + )); + let predicate = SpatialFilter::try_from_expr(&dwithin_expr).unwrap(); + assert!( + matches!(predicate, SpatialFilter::Unknown), + "ST_DWithin involving geography should produce Unknown filter" + ); + + // Test ST_DWithin with reversed geometry arguments + let dwithin_expr_reversed: Arc = Arc::new(ScalarFunctionExpr::new( + "st_dwithin", + Arc::new(st_dwithin), + vec![literal.clone(), column.clone(), distance_literal.clone()], + Arc::new(Field::new("", DataType::Boolean, true)), + )); + let predicate_reversed = SpatialFilter::try_from_expr(&dwithin_expr_reversed).unwrap(); + assert!( + matches!(predicate_reversed, SpatialFilter::Unknown), + "ST_DWithin involving geography should produce Unknown filter" + ); + + // Test ST_Distance <= threshold + let st_distance = create_dummy_spatial_function("st_distance", 2); + let distance_expr: Arc = Arc::new(ScalarFunctionExpr::new( + "st_distance", + Arc::new(st_distance.clone()), + vec![column.clone(), literal.clone()], + Arc::new(Field::new("", DataType::Boolean, true)), + )); + let comparison_expr: Arc = Arc::new(BinaryExpr::new( + distance_expr.clone(), + Operator::LtEq, + distance_literal.clone(), + )); + let predicate = SpatialFilter::try_from_expr(&comparison_expr).unwrap(); + assert!( + matches!(predicate, SpatialFilter::Unknown), + "ST_Distance <= threshold involving geography should produce Unknown filter" + ); + + // Test threshold >= ST_Distance + let comparison_expr_reversed: Arc = Arc::new(BinaryExpr::new( + distance_literal.clone(), + Operator::GtEq, + distance_expr.clone(), + )); + let predicate_reversed = SpatialFilter::try_from_expr(&comparison_expr_reversed).unwrap(); + assert!( + matches!(predicate_reversed, SpatialFilter::Unknown), + "threshold >= ST_Distance involving geography should produce Unknown filter" + ); + } + #[test] fn predicate_from_expr_has_z() { let column: Arc = Arc::new(Column::new("geometry", 0)); diff --git a/rust/sedona-spatial-join/src/exec.rs b/rust/sedona-spatial-join/src/exec.rs index d9506e42b..fdbef75e9 100644 --- a/rust/sedona-spatial-join/src/exec.rs +++ b/rust/sedona-spatial-join/src/exec.rs @@ -633,7 +633,7 @@ mod tests { use geo_types::{Coord, Rect}; use rstest::rstest; use sedona_geometry::types::GeometryTypeId; - use sedona_schema::datatypes::WKB_GEOMETRY; + use sedona_schema::datatypes::{SedonaType, WKB_GEOGRAPHY, WKB_GEOMETRY}; use sedona_testing::datagen::RandomPartitionedDataBuilder; use tokio::sync::OnceCell; @@ -649,12 +649,13 @@ mod tests { /// Creates standard test data with left (Polygon) and right (Point) partitions fn create_default_test_data() -> Result<(TestPartitions, TestPartitions)> { - create_test_data_with_size_range((1.0, 10.0)) + create_test_data_with_size_range((1.0, 10.0), WKB_GEOMETRY) } /// Creates test data with custom size range fn create_test_data_with_size_range( size_range: (f64, f64), + sedona_type: SedonaType, ) -> Result<(TestPartitions, TestPartitions)> { let bounds = Rect::new(Coord { x: 0.0, y: 0.0 }, Coord { x: 100.0, y: 100.0 }); @@ -664,7 +665,7 @@ mod tests { .batches_per_partition(2) .rows_per_batch(30) .geometry_type(GeometryTypeId::Polygon) - .sedona_type(WKB_GEOMETRY) + .sedona_type(sedona_type.clone()) .bounds(bounds) .size_range(size_range) .null_rate(0.1) @@ -676,7 +677,7 @@ mod tests { .batches_per_partition(4) .rows_per_batch(30) .geometry_type(GeometryTypeId::Point) - .sedona_type(WKB_GEOMETRY) + .sedona_type(sedona_type) .bounds(bounds) .size_range(size_range) .null_rate(0.1) @@ -928,7 +929,7 @@ mod tests { #[tokio::test] async fn test_spatial_join_with_filter() -> Result<()> { let ((left_schema, left_partitions), (right_schema, right_partitions)) = - create_test_data_with_size_range((0.1, 10.0))?; + create_test_data_with_size_range((0.1, 10.0), WKB_GEOMETRY)?; for max_batch_size in [10, 30, 100] { let options = SpatialJoinOptions { @@ -996,6 +997,37 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_geography_join_is_not_optimized() -> Result<()> { + let options = SpatialJoinOptions::default(); + let ctx = setup_context(Some(options), 10)?; + + // Prepare geography tables + let ((left_schema, left_partitions), (right_schema, right_partitions)) = + create_test_data_with_size_range((0.1, 10.0), WKB_GEOGRAPHY)?; + let mem_table_left: Arc = + Arc::new(MemTable::try_new(left_schema, left_partitions)?); + let mem_table_right: Arc = + Arc::new(MemTable::try_new(right_schema, right_partitions)?); + ctx.register_table("L", mem_table_left)?; + ctx.register_table("R", mem_table_right)?; + + // Execute geography join query + let df = ctx + .sql("SELECT * FROM L JOIN R ON ST_Intersects(L.geometry, R.geometry)") + .await?; + let plan = df.create_physical_plan().await?; + + // Verify that no SpatialJoinExec is present (geography join should not be optimized) + let spatial_joins = collect_spatial_join_exec(&plan)?; + assert!( + spatial_joins.is_empty(), + "Geography joins should not be optimized to SpatialJoinExec" + ); + + Ok(()) + } + async fn test_with_join_types(join_type: JoinType) -> Result { let ((left_schema, left_partitions), (right_schema, right_partitions)) = create_test_data_with_empty_partitions()?; diff --git a/rust/sedona-spatial-join/src/optimizer.rs b/rust/sedona-spatial-join/src/optimizer.rs index a576014c7..6440da264 100644 --- a/rust/sedona-spatial-join/src/optimizer.rs +++ b/rust/sedona-spatial-join/src/optimizer.rs @@ -20,7 +20,7 @@ use crate::exec::SpatialJoinExec; use crate::spatial_predicate::{ DistancePredicate, KNNPredicate, RelationPredicate, SpatialPredicate, SpatialRelationType, }; -use arrow_schema::SchemaRef; +use arrow_schema::{Schema, SchemaRef}; use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan; use datafusion::{ config::ConfigOptions, execution::session_state::SessionStateBuilder, @@ -41,6 +41,8 @@ use datafusion_physical_plan::joins::{HashJoinExec, NestedLoopJoinExec}; use datafusion_physical_plan::{joins::utils::JoinFilter, ExecutionPlan}; use sedona_common::{option::SedonaOptions, sedona_internal_err}; use sedona_expr::utils::{parse_distance_predicate, ParsedDistancePredicate}; +use sedona_schema::datatypes::SedonaType; +use sedona_schema::matchers::ArgMatcher; /// Physical planner extension for spatial joins /// @@ -140,6 +142,15 @@ impl SpatialJoinOptimizer { let right = nested_loop_join.right().clone(); let join_type = nested_loop_join.join_type(); + // Check if the geospatial types involved in spatial_predicate are supported + if !is_spatial_predicate_supported( + &spatial_predicate, + &left.schema(), + &right.schema(), + ) { + return Ok(None); + } + // Create the spatial join let spatial_join = SpatialJoinExec::try_new( left, @@ -174,6 +185,15 @@ impl SpatialJoinOptimizer { return Ok(None); } + // Check if the geospatial types involved in spatial_predicate are supported (planar geometries only) + if !is_spatial_predicate_supported( + &spatial_predicate, + &hash_join.left().schema(), + &hash_join.right().schema(), + ) { + return Ok(None); + } + // Extract the equi-join conditions and convert them to a filter let equi_filter = self.create_equi_filter_from_hash_join(hash_join)?; @@ -853,6 +873,34 @@ fn replace_join_filter_expr(expr: &Arc, join_filter: &JoinFilt ) } +fn is_spatial_predicate_supported( + spatial_predicate: &SpatialPredicate, + left_schema: &Schema, + right_schema: &Schema, +) -> bool { + /// Only spatial predicates working with planar geometry are supported for optimization. + /// Geography (spherical) types are explicitly excluded and will not trigger optimized spatial joins. + fn is_geometry_type_supported(expr: &Arc, schema: &Schema) -> bool { + let Ok(left_return_field) = expr.return_field(schema) else { + return false; + }; + let Ok(sedona_type) = SedonaType::from_storage_field(&left_return_field) else { + return false; + }; + let matcher = ArgMatcher::is_geometry(); + matcher.match_type(&sedona_type) + } + + match spatial_predicate { + SpatialPredicate::Relation(RelationPredicate { left, right, .. }) + | SpatialPredicate::Distance(DistancePredicate { left, right, .. }) + | SpatialPredicate::KNearestNeighbors(KNNPredicate { left, right, .. }) => { + is_geometry_type_supported(left, left_schema) + && is_geometry_type_supported(right, right_schema) + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -865,7 +913,7 @@ mod tests { use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr}; use datafusion_physical_plan::joins::utils::ColumnIndex; use datafusion_physical_plan::joins::utils::JoinFilter; - use sedona_schema::datatypes::WKB_GEOMETRY; + use sedona_schema::datatypes::{WKB_GEOGRAPHY, WKB_GEOMETRY}; use std::sync::Arc; // Helper function to create a test schema @@ -2418,4 +2466,39 @@ mod tests { let result = transform_join_filter(&join_filter); assert!(result.is_none()); // Should fail - k must be a literal value } + + #[test] + fn test_is_spatial_predicate_supported() { + // Planar geometry field + let geom_field = WKB_GEOMETRY.to_storage_field("geom", false).unwrap(); + let schema = Arc::new(Schema::new(vec![geom_field.clone()])); + let col_expr = Arc::new(Column::new("geom", 0)) as Arc; + let rel_pred = RelationPredicate::new( + col_expr.clone(), + col_expr.clone(), + SpatialRelationType::Intersects, + ); + let spatial_pred = SpatialPredicate::Relation(rel_pred); + assert!(super::is_spatial_predicate_supported( + &spatial_pred, + &schema, + &schema + )); + + // Geography field (should NOT be supported) + let geog_field = WKB_GEOGRAPHY.to_storage_field("geog", false).unwrap(); + let geog_schema = Arc::new(Schema::new(vec![geog_field.clone()])); + let geog_col_expr = Arc::new(Column::new("geog", 0)) as Arc; + let rel_pred_geog = RelationPredicate::new( + geog_col_expr.clone(), + geog_col_expr.clone(), + SpatialRelationType::Intersects, + ); + let spatial_pred_geog = SpatialPredicate::Relation(rel_pred_geog); + assert!(!super::is_spatial_predicate_supported( + &spatial_pred_geog, + &geog_schema, + &geog_schema + )); + } }