diff --git a/Cargo.lock b/Cargo.lock index bc6799a91..a4a2d5e9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5499,6 +5499,7 @@ dependencies = [ "sedona-proj", "sedona-schema", "sedona-tg", + "serde_json", "thiserror 2.0.17", "tokio", ] diff --git a/python/sedonadb/Cargo.toml b/python/sedonadb/Cargo.toml index 426bed90e..177e96cee 100644 --- a/python/sedonadb/Cargo.toml +++ b/python/sedonadb/Cargo.toml @@ -50,6 +50,7 @@ sedona-geoparquet = { workspace = true } sedona-schema = { workspace = true } sedona-proj = { workspace = true } sedona-tg = { workspace = true } +serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } mimalloc = { workspace = true, optional = true } diff --git a/python/sedonadb/python/sedonadb/context.py b/python/sedonadb/python/sedonadb/context.py index a3a624ac5..21a380aec 100644 --- a/python/sedonadb/python/sedonadb/context.py +++ b/python/sedonadb/python/sedonadb/context.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import json import os import sys from functools import cached_property @@ -126,6 +127,7 @@ def read_parquet( self, table_paths: Union[str, Path, Iterable[str]], options: Optional[Dict[str, Any]] = None, + geometry_columns: Optional[Union[str, Dict[str, Any]]] = None, ) -> DataFrame: """Create a [DataFrame][sedonadb.dataframe.DataFrame] from one or more Parquet files @@ -134,6 +136,50 @@ def read_parquet( files. options: Optional dictionary of options to pass to the Parquet reader. For S3 access, use {"aws.skip_signature": True, "aws.region": "us-west-2"} for anonymous access to public buckets. + geometry_columns: Optional JSON string or dict mapping column name to + GeoParquet column metadata (e.g., + {"geom": {"encoding": "WKB"}}). Use this to mark binary WKB + columns as geometry columns or correct metadata such as the + column CRS. + + Supported keys: + - encoding: "WKB" (required) + - crs: (e.g., "EPSG:4326") + - edges: "planar" (default) or "spherical" + - ...other supported keys + See the specification for details: https://geoparquet.org/releases/v1.1.0/ + + Useful for: + - Legacy Parquet files with Binary columns containing WKB payloads. + - Overriding GeoParquet metadata when fields like `crs` are missing. + + Precedence: + - GeoParquet metadata is used to infer geometry columns first. + - geometry_columns then overrides the auto-inferred schema: + - If a column is not geometry in metadata but appears in + geometry_columns, it is treated as a geometry column. + - If a column is geometry in metadata and also appears in + geometry_columns, the provided metadata replaces the inferred + metadata for that column. Missing optional fields are treated + as absent/defaults. + + Example: + - For `geo.parquet(geo1: geometry, geo2: geometry, geo3: binary)`, + `read_parquet("geo.parquet", geometry_columns='{"geo2": {"encoding": "WKB"}, "geo3": {"encoding": "WKB"}}')` + overrides `geo2` metadata and treats `geo3` as a geometry column. + - If `geo` inferred from metadata has: + - `geo: {"encoding": "wkb", "crs": "EPSG:4326", ..}` + and geometry_columns provides: + - `geo: {"encoding": "wkb", "crs": "EPSG:3857"}` + then the result is (full overwrite): + - `geo: {"encoding": "wkb", "crs": "EPSG:3857", ..}` (other fields are defaulted) + + + Safety: + - Columns specified here are not validated against the provided options + (e.g., WKB encoding checks); inconsistent data may cause undefined + behavior. + Examples: @@ -141,7 +187,6 @@ def read_parquet( >>> url = "https://github.com/apache/sedona-testing/raw/refs/heads/main/data/parquet/geoparquet-1.1.0.parquet" >>> sd.read_parquet(url) - """ if isinstance(table_paths, (str, Path)): table_paths = [table_paths] @@ -149,9 +194,14 @@ def read_parquet( if options is None: options = {} + if geometry_columns is not None and not isinstance(geometry_columns, str): + geometry_columns = json.dumps(geometry_columns) + return DataFrame( self._impl, - self._impl.read_parquet([str(path) for path in table_paths], options), + self._impl.read_parquet( + [str(path) for path in table_paths], options, geometry_columns + ), self.options, ) diff --git a/python/sedonadb/src/context.rs b/python/sedonadb/src/context.rs index 67ad8dcc1..3bb7c5e35 100644 --- a/python/sedonadb/src/context.rs +++ b/python/sedonadb/src/context.rs @@ -80,6 +80,7 @@ impl InternalContext { py: Python<'py>, table_paths: Vec, options: HashMap, + geometry_columns: Option, ) -> Result { // Convert Python options to strings, filtering out None values let rust_options: HashMap = options @@ -97,9 +98,17 @@ impl InternalContext { }) .collect(); - let geo_options = + let mut geo_options = sedona_geoparquet::provider::GeoParquetReadOptions::from_table_options(rust_options) .map_err(|e| PySedonaError::SedonaPython(format!("Invalid table options: {e}")))?; + if let Some(geometry_columns) = geometry_columns { + geo_options = geo_options + .with_geometry_columns_json(&geometry_columns) + .map_err(|e| { + PySedonaError::SedonaPython(format!("Invalid geometry_columns JSON: {e}")) + })?; + } + let df = wait_for_future( py, &self.runtime, diff --git a/python/sedonadb/tests/test_context.py b/python/sedonadb/tests/test_context.py index d9b1d33bd..6b876eedc 100644 --- a/python/sedonadb/tests/test_context.py +++ b/python/sedonadb/tests/test_context.py @@ -14,10 +14,37 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. +import json +from pathlib import Path +from typing import Any, Mapping + import geoarrow.pyarrow as ga # noqa: F401 import pyarrow as pa +import pyarrow.parquet as pq import pytest import sedonadb +import shapely + + +def _parse_geo_metadata(geoparquet_path: Path) -> Mapping[str, Any]: + """Return the GeoParquet "geo" metadata map, asserting it exists.""" + metadata = pq.read_metadata(geoparquet_path).metadata + assert metadata is not None + + geo = metadata.get(b"geo") + assert geo is not None + + return json.loads(geo.decode()) + + +def _geom_column_metadata( + geoparquet_path: Path, column_name: str = "geom" +) -> Mapping[str, Any]: + geo_metadata = _parse_geo_metadata(geoparquet_path) + columns = geo_metadata.get("columns") + assert isinstance(columns, dict) + assert column_name in columns + return columns[column_name] def test_options(): @@ -100,6 +127,144 @@ def test_read_parquet_options_parameter(con, geoarrow_data): ) # Should be identical (option ignored but not errored) +# Basic test for `geometry_columns` option for `read_parquet(..)` +def test_read_parquet_geometry_columns_roundtrip(con, tmp_path): + # Write a regular Parquet table with a Binary WKB column. + geom = shapely.from_wkt("POINT (0 1)").wkb + table = pa.table({"id": [1], "geom": [geom]}) + src = tmp_path / "plain.parquet" + pq.write_table(table, src) + + # GeoParquet metadata should not be present. + metadata = pq.read_metadata(src).metadata + assert metadata is not None + assert b"geo" not in metadata + + # Test 1: when adding a new geometry column, `encoding` must be provided. + geometry_columns = json.dumps({"geom": {"crs": "EPSG:4326"}}) + with pytest.raises( + sedonadb._lib.SedonaError, + match="missing field `encoding`", + ): + con.read_parquet(src, geometry_columns=geometry_columns) + + # Test 2: mark 'geom' as geometry and round-trip to GeoParquet. + geometry_columns = json.dumps({"geom": {"encoding": "WKB"}}) + df = con.read_parquet(src, geometry_columns=geometry_columns) + out_geo1 = tmp_path / "geo1.parquet" + df.to_parquet(out_geo1) + + geom_meta = _geom_column_metadata(out_geo1) + assert geom_meta["encoding"] == "WKB" + + # Test 3: overriding an existing geometry column requires `encoding`. + geometry_columns = json.dumps({"geom": {"crs": "EPSG:3857"}}) + with pytest.raises( + sedonadb._lib.SedonaError, + match="missing field `encoding`", + ): + con.read_parquet(out_geo1, geometry_columns=geometry_columns) + + # Test 4: override existing metadata with a full replacement. + geometry_columns = json.dumps({"geom": {"encoding": "WKB", "crs": "EPSG:3857"}}) + df = con.read_parquet(out_geo1, geometry_columns=geometry_columns) + out_geo2 = tmp_path / "geo2.parquet" + df.to_parquet(out_geo2) + + geom_meta = _geom_column_metadata(out_geo2) + assert geom_meta["encoding"] == "WKB" + assert geom_meta["crs"] == "EPSG:3857" + + # Test 5: overriding with a different CRS replaces the previous value. + geometry_columns = json.dumps({"geom": {"encoding": "WKB", "crs": "EPSG:4326"}}) + df = con.read_parquet(out_geo2, geometry_columns=geometry_columns) + out_geo3 = tmp_path / "geo3.parquet" + df.to_parquet(out_geo3) + + geom_meta = _geom_column_metadata(out_geo3) + assert geom_meta["encoding"] == "WKB" + assert "crs" not in geom_meta + + # Test 6: adding `geometry_types` is allowed and replaces prior metadata. + geometry_columns = json.dumps( + {"geom": {"encoding": "WKB", "geometry_types": ["Point"]}} + ) + df = con.read_parquet(out_geo3, geometry_columns=geometry_columns) + out_geo4 = tmp_path / "geo4.parquet" + df.to_parquet(out_geo4) + geom_meta = _geom_column_metadata(out_geo4) + assert geom_meta["encoding"] == "WKB" + assert "crs" not in geom_meta + + # Test 7: specify multiple options on plain Parquet input. + geometry_columns = json.dumps( + { + "geom": { + "encoding": "WKB", + "crs": "EPSG:3857", + "edges": "spherical", + "geometry_types": ["Point"], + } + } + ) + df = con.read_parquet(src, geometry_columns=geometry_columns) + out_geo_multi = tmp_path / "geo_multi.parquet" + df.to_parquet(out_geo_multi) + geom_meta = _geom_column_metadata(out_geo_multi) + assert geom_meta["encoding"] == "WKB" + assert geom_meta["crs"] == "EPSG:3857" + assert geom_meta["edges"] == "spherical" + + # Test 8: specify a non-existent column raises error + geometry_columns = json.dumps( + { + "geom_foo": { + "encoding": "WKB", + } + } + ) + with pytest.raises( + sedonadb._lib.SedonaError, match="Geometry columns not found in schema" + ): + df = con.read_parquet(src, geometry_columns=geometry_columns) + + +def test_read_parquet_geometry_columns_multiple_columns(con, tmp_path): + # Write a regular Parquet table with two Binary WKB columns. + geom1 = shapely.from_wkt("POINT (0 1)").wkb + geom2 = shapely.from_wkt("POINT (1 2)").wkb + table = pa.table({"id": [1], "geom1": [geom1], "geom2": [geom2]}) + src = tmp_path / "plain_multi.parquet" + pq.write_table(table, src) + + # Mark geom1 as geometry and write GeoParquet. + geometry_columns = json.dumps({"geom1": {"encoding": "WKB"}}) + df = con.read_parquet(src, geometry_columns=geometry_columns) + out_geo1 = tmp_path / "geo_multi1.parquet" + df.to_parquet(out_geo1) + + geo_metadata = _parse_geo_metadata(out_geo1) + assert "geom1" in geo_metadata["columns"] + assert "geom2" not in geo_metadata["columns"] + + # Mark geom2 as geometry and override geom1 in one call. + geometry_columns = json.dumps( + { + "geom1": {"encoding": "WKB", "crs": "EPSG:3857"}, + "geom2": {"encoding": "WKB"}, + } + ) + df = con.read_parquet(out_geo1, geometry_columns=geometry_columns) + out_geo2 = tmp_path / "geo_multi2.parquet" + df.to_parquet(out_geo2) + + geom1_meta = _geom_column_metadata(out_geo2, "geom1") + geom2_meta = _geom_column_metadata(out_geo2, "geom2") + assert geom1_meta["encoding"] == "WKB" + assert geom1_meta["crs"] == "EPSG:3857" + assert geom2_meta["encoding"] == "WKB" + + def test_read_geoparquet_s3_anonymous_access(): """Test reading from a public S3 bucket geoparquet file with anonymous access""" con = sedonadb.connect() diff --git a/rust/sedona-geoparquet/src/format.rs b/rust/sedona-geoparquet/src/format.rs index 0da807b3c..dbf3c2b16 100644 --- a/rust/sedona-geoparquet/src/format.rs +++ b/rust/sedona-geoparquet/src/format.rs @@ -15,7 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, collections::HashMap, sync::Arc}; +use std::{ + any::Any, + collections::{HashMap, HashSet}, + sync::Arc, +}; use arrow_schema::{Schema, SchemaRef}; use async_trait::async_trait; @@ -49,7 +53,7 @@ use sedona_schema::extension_type::ExtensionType; use crate::{ file_opener::{storage_schema_contains_geo, GeoParquetFileOpener}, - metadata::{GeoParquetColumnEncoding, GeoParquetMetadata}, + metadata::{GeoParquetColumnEncoding, GeoParquetColumnMetadata, GeoParquetMetadata}, options::TableGeoParquetOptions, writer::create_geoparquet_writer_physical_plan, }; @@ -146,6 +150,19 @@ impl GeoParquetFormat { } } +/// Merge geometry columns metadata. +/// `overrides` columns replace any inferred metadata for the same column name. +fn merge_geometry_columns( + base: &mut HashMap, + overrides: &HashMap, +) -> Result<()> { + for (column_name, override_meta) in overrides { + base.insert(column_name.clone(), override_meta.clone()); + } + + Ok(()) +} + #[async_trait] impl FileFormat for GeoParquetFormat { fn as_any(&self) -> &dyn Any { @@ -201,6 +218,8 @@ impl FileFormat for GeoParquetFormat { .try_collect() .await?; + // Combine multiple partitioned geoparquet files' metadata into a single one + // See comments in `try_update(..)` for the specific behaviors. let mut geoparquet_metadata: Option = None; for metadata in &metadatas { if let Some(kv) = metadata.file_metadata().key_value_metadata() { @@ -222,38 +241,58 @@ impl FileFormat for GeoParquetFormat { } } - if let Some(geo_metadata) = geoparquet_metadata { - let new_fields: Result> = inner_schema_without_metadata - .fields() - .iter() - .map(|field| { - if let Some(geo_column) = geo_metadata.columns.get(field.name()) { - match geo_column.encoding { - GeoParquetColumnEncoding::WKB => { - let extension = ExtensionType::new( - "geoarrow.wkb", - field.data_type().clone(), - Some(geo_column.to_geoarrow_metadata()?), - ); - Ok(Arc::new( - extension.to_field(field.name(), field.is_nullable()), - )) - } - _ => plan_err!( - "Unsupported GeoParquet encoding: {}", - geo_column.encoding - ), + // Geometry columns have been inferred from metadata, next combine column + // metadata from options with the inferred ones + let mut inferred_geo_cols = match geoparquet_metadata { + Some(geo_metadata) => geo_metadata.columns, + None => HashMap::new(), + }; + + if let Some(geometry_columns) = &self.options.geometry_columns { + merge_geometry_columns(&mut inferred_geo_cols, geometry_columns)?; + } + + if inferred_geo_cols.is_empty() { + return Ok(inner_schema_without_metadata); + } + + let mut remaining: HashSet = inferred_geo_cols.keys().cloned().collect(); + let new_fields: Result> = inner_schema_without_metadata + .fields() + .iter() + .map(|field| { + if let Some(geo_column) = inferred_geo_cols.get(field.name()) { + remaining.remove(field.name()); + let encoding = geo_column.encoding; + match encoding { + GeoParquetColumnEncoding::WKB => { + let extension = ExtensionType::new( + "geoarrow.wkb", + field.data_type().clone(), + Some(geo_column.to_geoarrow_metadata()?), + ); + Ok(Arc::new( + extension.to_field(field.name(), field.is_nullable()), + )) } - } else { - Ok(field.clone()) + _ => plan_err!("Unsupported GeoParquet encoding: {}", encoding), } - }) - .collect(); + } else { + Ok(field.clone()) + } + }) + .collect(); - Ok(Arc::new(Schema::new(new_fields?))) - } else { - Ok(inner_schema_without_metadata) + if !remaining.is_empty() { + let mut missing: Vec<_> = remaining.into_iter().collect(); + missing.sort(); + return plan_err!( + "Geometry columns not found in schema: {}", + missing.join(", ") + ); } + + Ok(Arc::new(Schema::new(new_fields?))) } async fn infer_stats( diff --git a/rust/sedona-geoparquet/src/options.rs b/rust/sedona-geoparquet/src/options.rs index 0301716f2..eaa53dd5d 100644 --- a/rust/sedona-geoparquet/src/options.rs +++ b/rust/sedona-geoparquet/src/options.rs @@ -15,11 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::str::FromStr; +use std::{collections::HashMap, str::FromStr}; use datafusion::config::TableParquetOptions; use datafusion_common::{plan_err, DataFusionError}; +use crate::metadata::GeoParquetColumnMetadata; + /// [TableParquetOptions] wrapper with GeoParquet-specific options #[derive(Debug, Default, Clone)] pub struct TableGeoParquetOptions { @@ -30,6 +32,8 @@ pub struct TableGeoParquetOptions { /// When writing [GeoParquetVersion::V1_1], use `true` to overwrite existing /// bounding box columns. pub overwrite_bbox_columns: bool, + /// Optional geometry column metadata overrides for schema inference. + pub geometry_columns: Option>, } impl TableGeoParquetOptions { diff --git a/rust/sedona-geoparquet/src/provider.rs b/rust/sedona-geoparquet/src/provider.rs index 51b92c0ed..be2f5925d 100644 --- a/rust/sedona-geoparquet/src/provider.rs +++ b/rust/sedona-geoparquet/src/provider.rs @@ -27,9 +27,11 @@ use datafusion::{ execution::{options::ReadOptions, SessionState}, prelude::{ParquetReadOptions, SessionConfig, SessionContext}, }; -use datafusion_common::{exec_err, Result}; +use datafusion_common::{exec_err, plan_err, Result}; -use crate::format::GeoParquetFormat; +use crate::{ + format::GeoParquetFormat, metadata::GeoParquetColumnMetadata, options::TableGeoParquetOptions, +}; /// Create a [ListingTable] of GeoParquet (or normal Parquet) files /// @@ -81,6 +83,7 @@ pub async fn geoparquet_listing_table( pub struct GeoParquetReadOptions<'a> { inner: ParquetReadOptions<'a>, table_options: Option>, + geometry_columns: Option>, } impl GeoParquetReadOptions<'_> { @@ -185,6 +188,7 @@ impl GeoParquetReadOptions<'_> { Ok(GeoParquetReadOptions { inner: ParquetReadOptions::default(), table_options: Some(options), + geometry_columns: None, }) } @@ -192,6 +196,36 @@ impl GeoParquetReadOptions<'_> { pub fn table_options(&self) -> Option<&HashMap> { self.table_options.as_ref() } + + /// Add geometry column metadata (JSON string) to apply during schema resolution + /// + /// Reads Parquet files as if GeoParquet metadata with the `"geometry_columns"` + /// key were present. If GeoParquet metadata is already present, the values provided + /// here will override any definitions provided in the original metadata. + /// + /// Errors if an invalid JSON configuration string is provided + pub fn with_geometry_columns_json(mut self, geometry_columns_json: &str) -> Result { + let geometry_columns = parse_geometry_columns_json(geometry_columns_json)?; + self.geometry_columns = Some(geometry_columns); + Ok(self) + } + + /// Get the geometry columns metadata + pub fn geometry_columns(&self) -> Option<&HashMap> { + self.geometry_columns.as_ref() + } +} + +fn parse_geometry_columns_json( + geometry_columns_json: &str, +) -> Result> { + let columns: HashMap = + match serde_json::from_str(geometry_columns_json) { + Ok(columns) => columns, + Err(e) => return plan_err!("geometry_columns must be valid JSON: {e}"), + }; + + Ok(columns) } #[async_trait] @@ -213,7 +247,11 @@ impl ReadOptions<'_> for GeoParquetReadOptions<'_> { let mut options = self.inner.to_listing_options(config, table_options); if let Some(parquet_format) = options.format.as_any().downcast_ref::() { - let geoparquet_options = parquet_format.options().clone().into(); + let mut geoparquet_options = + TableGeoParquetOptions::from(parquet_format.options().clone()); + if let Some(geometry_columns) = &self.geometry_columns { + geoparquet_options.geometry_columns = Some(geometry_columns.clone()); + } options.format = Arc::new(GeoParquetFormat::new(geoparquet_options)); return options; } @@ -227,9 +265,11 @@ impl ReadOptions<'_> for GeoParquetReadOptions<'_> { state: SessionState, table_path: ListingTableUrl, ) -> Result { - self.to_listing_options(config, state.default_table_options()) + let schema = self + .to_listing_options(config, state.default_table_options()) .infer_schema(&state, &table_path) - .await + .await?; + Ok(schema) } }