Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions python/sedonadb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ sedona-geoparquet = { workspace = true }
sedona-schema = { workspace = true }
sedona-proj = { workspace = true }
sedona-tg = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
mimalloc = { workspace = true, optional = true }
Expand Down
54 changes: 52 additions & 2 deletions python/sedonadb/python/sedonadb/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import json
import os
import sys
from functools import cached_property
Expand Down Expand Up @@ -126,6 +127,7 @@ def read_parquet(
self,
table_paths: Union[str, Path, Iterable[str]],
options: Optional[Dict[str, Any]] = None,
geometry_columns: Optional[Union[str, Dict[str, Any]]] = None,
) -> DataFrame:
"""Create a [DataFrame][sedonadb.dataframe.DataFrame] from one or more Parquet files

Expand All @@ -134,24 +136,72 @@ def read_parquet(
files.
options: Optional dictionary of options to pass to the Parquet reader.
For S3 access, use {"aws.skip_signature": True, "aws.region": "us-west-2"} for anonymous access to public buckets.
geometry_columns: Optional JSON string or dict mapping column name to
GeoParquet column metadata (e.g.,
{"geom": {"encoding": "WKB"}}). Use this to mark binary WKB
columns as geometry columns or correct metadata such as the
column CRS.

Supported keys:
- encoding: "WKB" (required)
- crs: (e.g., "EPSG:4326")
- edges: "planar" (default) or "spherical"
- ...other supported keys
See the specification for details: https://geoparquet.org/releases/v1.1.0/

Useful for:
- Legacy Parquet files with Binary columns containing WKB payloads.
- Overriding GeoParquet metadata when fields like `crs` are missing.

Precedence:
- GeoParquet metadata is used to infer geometry columns first.
- geometry_columns then overrides the auto-inferred schema:
- If a column is not geometry in metadata but appears in
geometry_columns, it is treated as a geometry column.
- If a column is geometry in metadata and also appears in
geometry_columns, the provided metadata replaces the inferred
metadata for that column. Missing optional fields are treated
as absent/defaults.

Example:
- For `geo.parquet(geo1: geometry, geo2: geometry, geo3: binary)`,
`read_parquet("geo.parquet", geometry_columns='{"geo2": {"encoding": "WKB"}, "geo3": {"encoding": "WKB"}}')`
overrides `geo2` metadata and treats `geo3` as a geometry column.
- If `geo` inferred from metadata has:
- `geo: {"encoding": "wkb", "crs": "EPSG:4326", ..}`
and geometry_columns provides:
- `geo: {"encoding": "wkb", "crs": "EPSG:3857"}`
then the result is (full overwrite):
- `geo: {"encoding": "wkb", "crs": "EPSG:3857", ..}` (other fields are defaulted)


Safety:
- Columns specified here are not validated against the provided options
(e.g., WKB encoding checks); inconsistent data may cause undefined
behavior.


Examples:

>>> sd = sedona.db.connect()
>>> url = "https://github.com/apache/sedona-testing/raw/refs/heads/main/data/parquet/geoparquet-1.1.0.parquet"
>>> sd.read_parquet(url)
<sedonadb.dataframe.DataFrame object at ...>

"""
if isinstance(table_paths, (str, Path)):
table_paths = [table_paths]

if options is None:
options = {}

if geometry_columns is not None and not isinstance(geometry_columns, str):
geometry_columns = json.dumps(geometry_columns)

return DataFrame(
self._impl,
self._impl.read_parquet([str(path) for path in table_paths], options),
self._impl.read_parquet(
[str(path) for path in table_paths], options, geometry_columns
),
self.options,
)

Expand Down
11 changes: 10 additions & 1 deletion python/sedonadb/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl InternalContext {
py: Python<'py>,
table_paths: Vec<String>,
options: HashMap<String, PyObject>,
geometry_columns: Option<String>,
) -> Result<InternalDataFrame, PySedonaError> {
// Convert Python options to strings, filtering out None values
let rust_options: HashMap<String, String> = options
Expand All @@ -97,9 +98,17 @@ impl InternalContext {
})
.collect();

let geo_options =
let mut geo_options =
sedona_geoparquet::provider::GeoParquetReadOptions::from_table_options(rust_options)
.map_err(|e| PySedonaError::SedonaPython(format!("Invalid table options: {e}")))?;
if let Some(geometry_columns) = geometry_columns {
geo_options = geo_options
.with_geometry_columns_json(&geometry_columns)
.map_err(|e| {
PySedonaError::SedonaPython(format!("Invalid geometry_columns JSON: {e}"))
})?;
}

let df = wait_for_future(
py,
&self.runtime,
Expand Down
165 changes: 165 additions & 0 deletions python/sedonadb/tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,37 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
from pathlib import Path
from typing import Any, Mapping

import geoarrow.pyarrow as ga # noqa: F401
import pyarrow as pa
import pyarrow.parquet as pq
import pytest
import sedonadb
import shapely


def _parse_geo_metadata(geoparquet_path: Path) -> Mapping[str, Any]:
"""Return the GeoParquet "geo" metadata map, asserting it exists."""
metadata = pq.read_metadata(geoparquet_path).metadata
assert metadata is not None

geo = metadata.get(b"geo")
assert geo is not None

return json.loads(geo.decode())


def _geom_column_metadata(
geoparquet_path: Path, column_name: str = "geom"
) -> Mapping[str, Any]:
geo_metadata = _parse_geo_metadata(geoparquet_path)
columns = geo_metadata.get("columns")
assert isinstance(columns, dict)
assert column_name in columns
return columns[column_name]


def test_options():
Expand Down Expand Up @@ -100,6 +127,144 @@ def test_read_parquet_options_parameter(con, geoarrow_data):
) # Should be identical (option ignored but not errored)


# Basic test for `geometry_columns` option for `read_parquet(..)`
def test_read_parquet_geometry_columns_roundtrip(con, tmp_path):
# Write a regular Parquet table with a Binary WKB column.
geom = shapely.from_wkt("POINT (0 1)").wkb
table = pa.table({"id": [1], "geom": [geom]})
src = tmp_path / "plain.parquet"
pq.write_table(table, src)

# GeoParquet metadata should not be present.
metadata = pq.read_metadata(src).metadata
assert metadata is not None
assert b"geo" not in metadata

# Test 1: when adding a new geometry column, `encoding` must be provided.
geometry_columns = json.dumps({"geom": {"crs": "EPSG:4326"}})
with pytest.raises(
sedonadb._lib.SedonaError,
match="missing field `encoding`",
):
con.read_parquet(src, geometry_columns=geometry_columns)

# Test 2: mark 'geom' as geometry and round-trip to GeoParquet.
geometry_columns = json.dumps({"geom": {"encoding": "WKB"}})
df = con.read_parquet(src, geometry_columns=geometry_columns)
out_geo1 = tmp_path / "geo1.parquet"
df.to_parquet(out_geo1)

geom_meta = _geom_column_metadata(out_geo1)
assert geom_meta["encoding"] == "WKB"

# Test 3: overriding an existing geometry column requires `encoding`.
geometry_columns = json.dumps({"geom": {"crs": "EPSG:3857"}})
with pytest.raises(
sedonadb._lib.SedonaError,
match="missing field `encoding`",
):
con.read_parquet(out_geo1, geometry_columns=geometry_columns)

# Test 4: override existing metadata with a full replacement.
geometry_columns = json.dumps({"geom": {"encoding": "WKB", "crs": "EPSG:3857"}})
df = con.read_parquet(out_geo1, geometry_columns=geometry_columns)
out_geo2 = tmp_path / "geo2.parquet"
df.to_parquet(out_geo2)

geom_meta = _geom_column_metadata(out_geo2)
assert geom_meta["encoding"] == "WKB"
assert geom_meta["crs"] == "EPSG:3857"

# Test 5: overriding with a different CRS replaces the previous value.
geometry_columns = json.dumps({"geom": {"encoding": "WKB", "crs": "EPSG:4326"}})
df = con.read_parquet(out_geo2, geometry_columns=geometry_columns)
out_geo3 = tmp_path / "geo3.parquet"
df.to_parquet(out_geo3)

geom_meta = _geom_column_metadata(out_geo3)
assert geom_meta["encoding"] == "WKB"
assert "crs" not in geom_meta

# Test 6: adding `geometry_types` is allowed and replaces prior metadata.
geometry_columns = json.dumps(
{"geom": {"encoding": "WKB", "geometry_types": ["Point"]}}
)
df = con.read_parquet(out_geo3, geometry_columns=geometry_columns)
out_geo4 = tmp_path / "geo4.parquet"
df.to_parquet(out_geo4)
geom_meta = _geom_column_metadata(out_geo4)
assert geom_meta["encoding"] == "WKB"
assert "crs" not in geom_meta

# Test 7: specify multiple options on plain Parquet input.
geometry_columns = json.dumps(
{
"geom": {
"encoding": "WKB",
"crs": "EPSG:3857",
"edges": "spherical",
"geometry_types": ["Point"],
}
}
)
df = con.read_parquet(src, geometry_columns=geometry_columns)
out_geo_multi = tmp_path / "geo_multi.parquet"
df.to_parquet(out_geo_multi)
geom_meta = _geom_column_metadata(out_geo_multi)
assert geom_meta["encoding"] == "WKB"
assert geom_meta["crs"] == "EPSG:3857"
assert geom_meta["edges"] == "spherical"

# Test 8: specify a non-existent column raises error
geometry_columns = json.dumps(
{
"geom_foo": {
"encoding": "WKB",
}
}
)
with pytest.raises(
sedonadb._lib.SedonaError, match="Geometry columns not found in schema"
):
df = con.read_parquet(src, geometry_columns=geometry_columns)


def test_read_parquet_geometry_columns_multiple_columns(con, tmp_path):
# Write a regular Parquet table with two Binary WKB columns.
geom1 = shapely.from_wkt("POINT (0 1)").wkb
geom2 = shapely.from_wkt("POINT (1 2)").wkb
table = pa.table({"id": [1], "geom1": [geom1], "geom2": [geom2]})
src = tmp_path / "plain_multi.parquet"
pq.write_table(table, src)

# Mark geom1 as geometry and write GeoParquet.
geometry_columns = json.dumps({"geom1": {"encoding": "WKB"}})
df = con.read_parquet(src, geometry_columns=geometry_columns)
out_geo1 = tmp_path / "geo_multi1.parquet"
df.to_parquet(out_geo1)

geo_metadata = _parse_geo_metadata(out_geo1)
assert "geom1" in geo_metadata["columns"]
assert "geom2" not in geo_metadata["columns"]

# Mark geom2 as geometry and override geom1 in one call.
geometry_columns = json.dumps(
{
"geom1": {"encoding": "WKB", "crs": "EPSG:3857"},
"geom2": {"encoding": "WKB"},
}
)
df = con.read_parquet(out_geo1, geometry_columns=geometry_columns)
out_geo2 = tmp_path / "geo_multi2.parquet"
df.to_parquet(out_geo2)

geom1_meta = _geom_column_metadata(out_geo2, "geom1")
geom2_meta = _geom_column_metadata(out_geo2, "geom2")
assert geom1_meta["encoding"] == "WKB"
assert geom1_meta["crs"] == "EPSG:3857"
assert geom2_meta["encoding"] == "WKB"


def test_read_geoparquet_s3_anonymous_access():
"""Test reading from a public S3 bucket geoparquet file with anonymous access"""
con = sedonadb.connect()
Expand Down
Loading