Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

name: python

on:
Expand Down Expand Up @@ -73,14 +74,18 @@ jobs:
run: |
pip install -e "python/sedonadb/[test]" -vv

- name: Download minimal geoarrow-data assets
run: |
python submodules/download-assets.py "*water-junc*" "*water-point*"

- name: Start PostGIS
run: |
docker compose up --wait --detach postgis

- name: Run tests
env:
# Ensure that there are no skips for PostGIS or duckdb not available
SEDONADB_PYTHON_ENSURE_ALL_ENGINES: "true"
# Ensure that we don't skip tests that we didn't intend to
SEDONADB_PYTHON_NO_SKIP_TESTS: "true"
run: |
cd python
python -m pytest -vv
Expand Down
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@
[submodule "c/sedona-s2geography/s2geometry"]
path = c/sedona-s2geography/s2geometry
url = https://github.com/google/s2geometry.git
[submodule "submodules/sedona-testing"]
path = submodules/sedona-testing
url = https://github.com/apache/sedona-testing.git
33 changes: 30 additions & 3 deletions python/sedonadb/python/sedonadb/testing.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import warnings
from pathlib import Path
from typing import TYPE_CHECKING, List, Tuple

import geoarrow.pyarrow as ga
Expand All @@ -11,6 +12,29 @@
import sedonadb


def skip_if_not_exists(path: Path):
"""Skip a test using pytest.skip() if path does not exist

If SEDONADB_PYTHON_NO_SKIP_TESTS is set, this function will never skip to
avoid accidentally skipping tests on CI.
"""
if _env_no_skip():
return

if not path.exists():
import pytest

pytest.skip(
f"Test asset '{path}' not found. "
"Run submodules/download-assets.py to test with submodule assets"
)


def _env_no_skip():
env_no_skip = os.environ.get("SEDONADB_PYTHON_NO_SKIP_TESTS", "false")
return env_no_skip in ("true", "1")


class DBEngine:
"""Engine-agnostic catalog and SQL engine

Expand Down Expand Up @@ -43,12 +67,14 @@ def create_or_skip(cls, *args, **kwargs) -> "DBEngine":
This is the constructor that should be used in tests to ensure that integration
style tests don't cause failure for contributors working on Python-only
behaviour.

If SEDONADB_PYTHON_NO_SKIP_TESTS is set, this function will never skip to
avoid accidentally skipping tests on CI.
"""
import pytest

# Ensure we can force this to succeed (or fail in CI)
env_no_skip = os.environ.get("SEDONADB_PYTHON_ENSURE_ALL_ENGINES", "false")
if env_no_skip in ("true", "1"):
if _env_no_skip():
return cls(*args, **kwargs)

# By default, allow construction to fail (e.g., for contributors running
Expand Down Expand Up @@ -267,7 +293,8 @@ def create_table_arrow(self, name, obj) -> "SedonaDB":
return self

def execute_and_collect(self, query) -> "sedonadb.dataframe.DataFrame":
return self.con.sql(query).collect()
# Use to_arrow_table() to maintain ordering of the input table
return self.con.sql(query).to_arrow_table()


class DuckDB(DBEngine):
Expand Down
5 changes: 5 additions & 0 deletions python/sedonadb/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ def con():
@pytest.fixture()
def geoarrow_data():
return HERE.parent.parent.parent / "submodules" / "geoarrow-data"


@pytest.fixture()
def sedona_testing():
return HERE.parent.parent.parent / "submodules" / "sedona-testing"
134 changes: 134 additions & 0 deletions python/sedonadb/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import pytest
import tempfile
import shapely
import geopandas
from pyarrow import parquet
from pathlib import Path
from sedonadb.testing import geom_or_null, SedonaDB, DuckDB, skip_if_not_exists


@pytest.mark.parametrize("name", ["water-junc", "water-point"])
def test_read_whole_geoparquet(geoarrow_data, name):
# Checks a read of some non-trivial files and ensures we match a GeoPandas read
eng = SedonaDB()
path = geoarrow_data / "ns-water" / "files" / f"ns-water_{name}_geo.parquet"
skip_if_not_exists(path)

gdf = geopandas.read_parquet(path).sort_values(by="OBJECTID").reset_index(drop=True)

eng.create_view_parquet("tab", path)
result = eng.execute_and_collect("""SELECT * FROM tab ORDER BY "OBJECTID";""")
eng.assert_result(result, gdf)


@pytest.mark.parametrize("name", ["geoparquet-1.0.0", "geoparquet-1.1.0", "plain"])
def test_read_sedona_testing(sedona_testing, name):
# Checks a read of trivial files (some GeoParquet and some not) against a DuckDB read
duckdb = DuckDB.create_or_skip()
sedonadb = SedonaDB()
path = sedona_testing / "data" / "parquet" / f"{name}.parquet"
skip_if_not_exists(path)

duckdb.create_view_parquet("tab", path)
result_duckdb = duckdb.execute_and_collect("SELECT * FROM tab")
df_duckdb = duckdb.result_to_pandas(result_duckdb)

# DuckDB never returns CRSes
kwargs = {}
if isinstance(df_duckdb, geopandas.GeoDataFrame):
kwargs["check_crs"] = False

sedonadb.create_view_parquet("tab", path)
sedonadb.assert_query_result("SELECT * FROM tab", df_duckdb, **kwargs)


@pytest.mark.parametrize("name", ["water-junc", "water-point"])
def test_read_geoparquet_pruned(geoarrow_data, name):
# Note that this doesn't check that pruning actually occurred, just that
# for a query where we should be pruning automatically that we don't omit results.
eng = SedonaDB()
path = geoarrow_data / "ns-water" / "files" / f"ns-water_{name}_geo.parquet"
skip_if_not_exists(path)

# Roughly a diamond around Gaspereau Lake, Nova Scotia, in UTM zone 20
wkt_filter = """
POLYGON ((
371000 4978000, 376000 4972000, 381000 4978000,
376000 4983000, 371000 4978000
))
"""
poly_filter = shapely.from_wkt(wkt_filter)

gdf = geopandas.read_parquet(path)
gdf = (
gdf[gdf.geometry.intersects(poly_filter)]
.sort_values(by="OBJECTID")
.reset_index(drop=True)
)
gdf = gdf[["OBJECTID", "geometry"]]

# Make sure this isn't a bogus test
assert len(gdf) > 0

with tempfile.TemporaryDirectory() as td:
# Write using GeoPandas, which implements GeoParquet 1.1 bbox covering
# Write tiny row groups so that many bounding boxes have to be checked
tmp_parquet = Path(td) / f"{name}.parquet"
geopandas.read_parquet(path).to_parquet(
tmp_parquet,
schema_version="1.1.0",
write_covering_bbox=True,
row_group_size=1024,
)

eng.create_view_parquet("tab", tmp_parquet)
result = eng.execute_and_collect(
f"""
SELECT "OBJECTID", geometry FROM tab
WHERE ST_Intersects(geometry, ST_SetSRID({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
ORDER BY "OBJECTID";
"""
)
eng.assert_result(result, gdf)

# Write a dataset with one file per row group to check file pruning correctness
ds_dir = Path(td) / "ds"
ds_dir.mkdir()
ds_paths = []

with parquet.ParquetFile(tmp_parquet) as f:
for i in range(f.metadata.num_row_groups):
tab = f.read_row_group(i, ["OBJECTID", "geometry"])
df = geopandas.GeoDataFrame.from_arrow(tab)
ds_path = ds_dir / f"file{i}.parquet"
df.to_parquet(ds_path)
ds_paths.append(ds_path)

# Check a query against the same dataset without the bbox column but with file-level
# geoparquet metadata bounding boxes
eng.create_view_parquet("tab_dataset", ds_paths)
result = eng.execute_and_collect(
f"""
SELECT * FROM tab_dataset
WHERE ST_Intersects(geometry, ST_SetSRID({geom_or_null(wkt_filter)}, '{gdf.crs.to_json()}'))
ORDER BY "OBJECTID";
"""
)
eng.assert_result(result, gdf)
63 changes: 44 additions & 19 deletions submodules/download-assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import json
import urllib.request
import shutil
Expand All @@ -22,22 +23,46 @@

HERE = Path(__file__).parent

with open(HERE / "geoarrow-data" / "manifest.json") as f:
manifest = json.load(f)

for group in manifest["groups"]:
group_name = group["name"]
for file in group["files"]:
url = file["url"]
filename = Path(url).name
local_path = HERE / "geoarrow-data" / group_name / "files" / filename
if local_path.exists():
print(f"Using cached '{filename}'")
elif file["format"] in ("parquet", "geoparquet"):
# Only download Parquet/GeoParquet versions of asset files to save space
print(f"Downloading {url}")
local_path.parent.mkdir(parents=True, exist_ok=True)
with urllib.request.urlopen(url) as fin, open(local_path, "wb") as fout:
shutil.copyfileobj(fin, fout)

print("Done!")

def download_files_lazy(include):
with open(HERE / "geoarrow-data" / "manifest.json") as f:
manifest = json.load(f)

for group in manifest["groups"]:
group_name = group["name"]
for file in group["files"]:
url = file["url"]
filename = Path(url).name
local_path = HERE / "geoarrow-data" / group_name / "files" / filename
if not path_match(local_path, include):
continue

if local_path.exists():
print(f"Using cached '{filename}'")
elif file["format"] in ("parquet", "geoparquet"):
# Only download Parquet/GeoParquet versions of asset files to save space
print(f"Downloading {url}")
local_path.parent.mkdir(parents=True, exist_ok=True)
with urllib.request.urlopen(url) as fin, open(local_path, "wb") as fout:
shutil.copyfileobj(fin, fout)

print("Done!")


def path_match(path, include):
for pattern in include:
if path.match(pattern):
return True

return False


if __name__ == "__main__":
import sys

include_patterns = sys.argv[1:]

if not include_patterns:
include_patterns = ["*"]

download_files_lazy(include_patterns)
1 change: 1 addition & 0 deletions submodules/sedona-testing
Submodule sedona-testing added at 5073f7