Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 167 additions & 0 deletions python/sedonadb/tests/test_sjoin.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,173 @@ def test_spatial_join(join_type, on):
eng_postgis.assert_query_result(sql, sedonadb_results)


@pytest.mark.parametrize(
"join_type",
[
"LEFT SEMI JOIN",
"LEFT ANTI JOIN",
"RIGHT SEMI JOIN",
"RIGHT ANTI JOIN",
],
)
@pytest.mark.parametrize(
"on",
[
"ST_Intersects(sjoin_point.geometry, sjoin_polygon.geometry)",
"ST_Within(sjoin_point.geometry, sjoin_polygon.geometry)",
"ST_Contains(sjoin_polygon.geometry, sjoin_point.geometry)",
"ST_DWithin(sjoin_point.geometry, sjoin_polygon.geometry, 1.0)",
"ST_DWithin(sjoin_point.geometry, sjoin_polygon.geometry, sjoin_point.dist / 100)",
"ST_DWithin(sjoin_point.geometry, sjoin_polygon.geometry, sjoin_polygon.dist / 100)",
],
)
def test_spatial_join_semi_anti(join_type, on):
with (
SedonaDB.create_or_skip() as eng_sedonadb,
PostGIS.create_or_skip() as eng_postgis,
):
options = json.dumps(
{
"geom_type": "Point",
"polygon_hole_rate": 0.5,
"num_parts_range": [2, 10],
"vertices_per_linestring_range": [2, 10],
"seed": 42,
}
)
df_point = eng_sedonadb.execute_and_collect(
f"SELECT * FROM sd_random_geometry('{options}') LIMIT 100"
)
options = json.dumps(
{
"geom_type": "Polygon",
"polygon_hole_rate": 0.5,
"num_parts_range": [2, 10],
"vertices_per_linestring_range": [2, 10],
"seed": 43,
}
)
df_polygon = eng_sedonadb.execute_and_collect(
f"SELECT * FROM sd_random_geometry('{options}') LIMIT 100"
)
eng_sedonadb.create_table_arrow("sjoin_point", df_point)
eng_sedonadb.create_table_arrow("sjoin_polygon", df_polygon)
eng_postgis.create_table_arrow("sjoin_point", df_point)
eng_postgis.create_table_arrow("sjoin_polygon", df_polygon)

is_left = join_type.startswith("LEFT")
is_semi = "SEMI" in join_type

if is_left:
sedona_sql = f"""
SELECT sjoin_point.id id0
FROM sjoin_point {join_type} sjoin_polygon
ON {on}
ORDER BY id0
"""
exists = f"EXISTS (SELECT 1 FROM sjoin_polygon WHERE {on})"
where = exists if is_semi else f"NOT {exists}"
postgis_sql = f"""
SELECT sjoin_point.id id0
FROM sjoin_point
WHERE {where}
ORDER BY id0
"""
else:
sedona_sql = f"""
SELECT sjoin_polygon.id id1
FROM sjoin_point {join_type} sjoin_polygon
ON {on}
ORDER BY id1
"""
exists = f"EXISTS (SELECT 1 FROM sjoin_point WHERE {on})"
where = exists if is_semi else f"NOT {exists}"
postgis_sql = f"""
SELECT sjoin_polygon.id id1
FROM sjoin_polygon
WHERE {where}
ORDER BY id1
"""

sedonadb_results = eng_sedonadb.execute_and_collect(sedona_sql).to_pandas()
assert len(sedonadb_results) > 0
eng_postgis.assert_query_result(postgis_sql, sedonadb_results)


@pytest.mark.parametrize(
"outer",
["point", "polygon"],
)
@pytest.mark.parametrize(
"on",
[
"ST_Intersects(sjoin_point.geometry, sjoin_polygon.geometry)",
"ST_Within(sjoin_point.geometry, sjoin_polygon.geometry)",
"ST_DWithin(sjoin_point.geometry, sjoin_polygon.geometry, 1.0)",
],
)
def test_spatial_mark_join_via_correlated_exists(outer, on):
with (
SedonaDB.create_or_skip() as eng_sedonadb,
PostGIS.create_or_skip() as eng_postgis,
):
options = json.dumps(
{
"geom_type": "Point",
"polygon_hole_rate": 0.5,
"num_parts_range": [2, 10],
"vertices_per_linestring_range": [2, 10],
"seed": 42,
}
)
df_point = eng_sedonadb.execute_and_collect(
f"SELECT * FROM sd_random_geometry('{options}') LIMIT 100"
)
options = json.dumps(
{
"geom_type": "Polygon",
"polygon_hole_rate": 0.5,
"num_parts_range": [2, 10],
"vertices_per_linestring_range": [2, 10],
"seed": 43,
}
)
df_polygon = eng_sedonadb.execute_and_collect(
f"SELECT * FROM sd_random_geometry('{options}') LIMIT 100"
)
eng_sedonadb.create_table_arrow("sjoin_point", df_point)
eng_sedonadb.create_table_arrow("sjoin_polygon", df_polygon)
eng_postgis.create_table_arrow("sjoin_point", df_point)
eng_postgis.create_table_arrow("sjoin_polygon", df_polygon)

if outer == "point":
sql = f"""
SELECT sjoin_point.id id0
FROM sjoin_point
WHERE sjoin_point.id = 1 OR EXISTS (SELECT 1 FROM sjoin_polygon WHERE {on})
ORDER BY id0
"""
else:
sql = f"""
SELECT sjoin_polygon.id id1, ST_AsBinary(sjoin_polygon.geometry) geom
FROM sjoin_polygon
WHERE sjoin_polygon.id = 1 OR EXISTS (SELECT 1 FROM sjoin_point WHERE {on})
ORDER BY id1
"""

# Verify the physical query plan contains a Mark join
query_plan = eng_sedonadb.execute_and_collect(f"EXPLAIN {sql}").to_pandas()
plan_text = "\n".join(query_plan.iloc[:, 1].astype(str).tolist())
assert any(
"SpatialJoinExec" in line and ("LeftMark" in line or "RightMark" in line)
for line in plan_text.splitlines()
), plan_text

sedonadb_results = eng_sedonadb.execute_and_collect(sql).to_pandas()
assert len(sedonadb_results) > 0
eng_postgis.assert_query_result(sql, sedonadb_results)


@pytest.mark.parametrize(
"join_type", ["INNER JOIN", "LEFT OUTER JOIN", "RIGHT OUTER JOIN"]
)
Expand Down
Loading