diff --git a/Cargo.lock b/Cargo.lock index 9fde69c7c..37563fec0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5245,6 +5245,7 @@ dependencies = [ "tempfile", "tokio", "url", + "wkb", ] [[package]] diff --git a/python/sedonadb/python/sedonadb/context.py b/python/sedonadb/python/sedonadb/context.py index 21a380aec..f07938708 100644 --- a/python/sedonadb/python/sedonadb/context.py +++ b/python/sedonadb/python/sedonadb/context.py @@ -128,6 +128,7 @@ def read_parquet( table_paths: Union[str, Path, Iterable[str]], options: Optional[Dict[str, Any]] = None, geometry_columns: Optional[Union[str, Dict[str, Any]]] = None, + validate: bool = False, ) -> DataFrame: """Create a [DataFrame][sedonadb.dataframe.DataFrame] from one or more Parquet files @@ -176,9 +177,18 @@ def read_parquet( Safety: - - Columns specified here are not validated against the provided options - (e.g., WKB encoding checks); inconsistent data may cause undefined - behavior. + - Columns specified here can optionally be validated according to the + `validate` option (e.g., WKB encoding checks). If validation is not + enabled, inconsistent data may cause undefined behavior. + validate: + When set to `True`, geometry column contents are validated against + their metadata. Metadata can come from the source Parquet file or + the user-provided `geometry_columns` option. + Only supported properties are validated; unsupported properties are + ignored. If validation fails, execution stops with an error. + + Currently the only property that is validated is the WKB of input geometry + columns. Examples: @@ -200,7 +210,7 @@ def read_parquet( return DataFrame( self._impl, self._impl.read_parquet( - [str(path) for path in table_paths], options, geometry_columns + [str(path) for path in table_paths], options, geometry_columns, validate ), self.options, ) diff --git a/python/sedonadb/src/context.rs b/python/sedonadb/src/context.rs index 3bb7c5e35..1647bd05a 100644 --- a/python/sedonadb/src/context.rs +++ b/python/sedonadb/src/context.rs @@ -81,6 +81,7 @@ impl InternalContext { table_paths: Vec, options: HashMap, geometry_columns: Option, + validate: bool, ) -> Result { // Convert Python options to strings, filtering out None values let rust_options: HashMap = options @@ -108,6 +109,7 @@ impl InternalContext { PySedonaError::SedonaPython(format!("Invalid geometry_columns JSON: {e}")) })?; } + geo_options = geo_options.with_validate(validate); let df = wait_for_future( py, diff --git a/python/sedonadb/tests/io/test_parquet.py b/python/sedonadb/tests/io/test_parquet.py index 7f87c027f..c80d74780 100644 --- a/python/sedonadb/tests/io/test_parquet.py +++ b/python/sedonadb/tests/io/test_parquet.py @@ -22,6 +22,7 @@ import geopandas import geopandas.testing import pyarrow as pa +import pyarrow.parquet as pq import pytest import sedonadb import shapely @@ -412,3 +413,79 @@ def test_write_geoparquet_geography(con, geoarrow_data): table_roundtrip = con.read_parquet(tmp_parquet).to_arrow_table() assert table_roundtrip == table + + +def test_read_parquet_validate_wkb_single_valid_row(con, tmp_path): + valid_wkb = bytes.fromhex("0101000000000000000000F03F0000000000000040") + + table = pa.table({"id": [1], "geom": [valid_wkb]}) + path = tmp_path / "single_valid_wkb.parquet" + pq.write_table(table, path) + + geometry_columns = json.dumps({"geom": {"encoding": "WKB"}}) + + tab = con.read_parquet( + path, geometry_columns=geometry_columns, validate=False + ).to_arrow_table() + assert tab["geom"].type.extension_name == "geoarrow.wkb" + assert len(tab) == 1 + + tab = con.read_parquet( + path, geometry_columns=geometry_columns, validate=True + ).to_arrow_table() + assert tab["geom"].type.extension_name == "geoarrow.wkb" + assert len(tab) == 1 + + +def test_read_parquet_validate_wkb_single_invalid_row(con, tmp_path): + invalid_wkb = b"\x01" + + table = pa.table({"id": [1], "geom": [invalid_wkb]}) + path = tmp_path / "single_invalid_wkb.parquet" + pq.write_table(table, path) + + geometry_columns = json.dumps({"geom": {"encoding": "WKB"}}) + + tab = con.read_parquet( + path, geometry_columns=geometry_columns, validate=False + ).to_arrow_table() + assert tab["geom"].type.extension_name == "geoarrow.wkb" + assert len(tab) == 1 + + with pytest.raises( + sedonadb._lib.SedonaError, + match=r"WKB validation failed", + ): + con.read_parquet( + path, geometry_columns=geometry_columns, validate=True + ).to_arrow_table() + + +def test_read_parquet_validate_wkb_partial_invalid_rows(con, tmp_path): + valid_wkb = bytes.fromhex("0101000000000000000000F03F0000000000000040") + invalid_wkb = b"\x01" + + table = pa.table( + { + "id": [1, 2, 3], + "geom": [valid_wkb, invalid_wkb, valid_wkb], + } + ) + path = tmp_path / "partial_invalid_wkb.parquet" + pq.write_table(table, path) + + geometry_columns = json.dumps({"geom": {"encoding": "WKB"}}) + + tab = con.read_parquet( + path, geometry_columns=geometry_columns, validate=False + ).to_arrow_table() + assert tab["geom"].type.extension_name == "geoarrow.wkb" + assert len(tab) == 3 + + with pytest.raises( + sedonadb._lib.SedonaError, + match=r"WKB validation failed", + ): + con.read_parquet( + path, geometry_columns=geometry_columns, validate=True + ).to_arrow_table() diff --git a/rust/sedona-geoparquet/Cargo.toml b/rust/sedona-geoparquet/Cargo.toml index ba65d5a2d..f70fb4b8d 100644 --- a/rust/sedona-geoparquet/Cargo.toml +++ b/rust/sedona-geoparquet/Cargo.toml @@ -67,3 +67,4 @@ sedona-schema = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } serde_with = { workspace = true } +wkb = { workspace = true } diff --git a/rust/sedona-geoparquet/src/file_opener.rs b/rust/sedona-geoparquet/src/file_opener.rs index ae2a56d2a..72719c881 100644 --- a/rust/sedona-geoparquet/src/file_opener.rs +++ b/rust/sedona-geoparquet/src/file_opener.rs @@ -16,17 +16,22 @@ // under the License. use std::{collections::HashMap, sync::Arc}; -use arrow_schema::SchemaRef; +use arrow_array::{Array, RecordBatch}; +use arrow_schema::{DataType, SchemaRef}; use datafusion::datasource::{ listing::PartitionedFile, physical_plan::{parquet::ParquetAccessPlan, FileOpenFuture, FileOpener}, }; -use datafusion_common::Result; +use datafusion_common::{ + cast::{as_binary_array, as_binary_view_array, as_large_binary_array}, + exec_err, Result, +}; use datafusion_datasource_parquet::metadata::DFParquetMetadata; use datafusion_physical_expr::PhysicalExpr; use datafusion_physical_plan::metrics::{ ExecutionPlanMetricsSet, MetricBuilder, MetricType, MetricValue, PruningMetrics, }; +use futures::StreamExt; use object_store::ObjectStore; use parquet::{ basic::LogicalType, @@ -47,7 +52,10 @@ use sedona_geometry::{ }; use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher}; -use crate::metadata::{GeoParquetColumnMetadata, GeoParquetMetadata}; +use crate::{ + metadata::{GeoParquetColumnEncoding, GeoParquetMetadata}, + options::TableGeoParquetOptions, +}; #[derive(Clone)] pub(crate) struct GeoParquetFileOpenerMetrics { @@ -98,11 +106,11 @@ pub(crate) struct GeoParquetFileOpener { pub inner: Arc, pub object_store: Arc, pub metadata_size_hint: Option, - pub predicate: Arc, + pub predicate: Option>, pub file_schema: SchemaRef, pub enable_pruning: bool, pub metrics: GeoParquetFileOpenerMetrics, - pub overrides: Option>, + pub options: TableGeoParquetOptions, } impl FileOpener for GeoParquetFileOpener { @@ -118,37 +126,41 @@ impl FileOpener for GeoParquetFileOpener { let mut access_plan = ParquetAccessPlan::new_all(parquet_metadata.num_row_groups()); + let maybe_geoparquet_metadata = GeoParquetMetadata::try_from_parquet_metadata( + &parquet_metadata, + self_clone.options.geometry_columns.as_ref(), + )?; + if self_clone.enable_pruning { - let spatial_filter = SpatialFilter::try_from_expr(&self_clone.predicate)?; - - if let Some(geoparquet_metadata) = GeoParquetMetadata::try_from_parquet_metadata( - &parquet_metadata, - self_clone.overrides.as_ref(), - )? { - filter_access_plan_using_geoparquet_file_metadata( - &self_clone.file_schema, - &mut access_plan, - &spatial_filter, - &geoparquet_metadata, - &self_clone.metrics, - )?; - - filter_access_plan_using_geoparquet_covering( - &self_clone.file_schema, - &mut access_plan, - &spatial_filter, - &geoparquet_metadata, - &parquet_metadata, - &self_clone.metrics, - )?; - - filter_access_plan_using_native_geostats( - &self_clone.file_schema, - &mut access_plan, - &spatial_filter, - &parquet_metadata, - &self_clone.metrics, - )?; + if let Some(predicate) = self_clone.predicate.as_ref() { + let spatial_filter = SpatialFilter::try_from_expr(predicate)?; + + if let Some(geoparquet_metadata) = maybe_geoparquet_metadata.as_ref() { + filter_access_plan_using_geoparquet_file_metadata( + &self_clone.file_schema, + &mut access_plan, + &spatial_filter, + geoparquet_metadata, + &self_clone.metrics, + )?; + + filter_access_plan_using_geoparquet_covering( + &self_clone.file_schema, + &mut access_plan, + &spatial_filter, + geoparquet_metadata, + &parquet_metadata, + &self_clone.metrics, + )?; + + filter_access_plan_using_native_geostats( + &self_clone.file_schema, + &mut access_plan, + &spatial_filter, + &parquet_metadata, + &self_clone.metrics, + )?; + } } } @@ -158,12 +170,110 @@ impl FileOpener for GeoParquetFileOpener { // We could also consider filtering using null_count here in the future (i.e., // skip row groups that are all null) let file = file.with_extensions(Arc::new(access_plan)); + let stream = self_clone.inner.open(file)?.await?; + + // Validate geometry columns when enabled from read option. + let validation_columns = if self_clone.options.validate { + maybe_geoparquet_metadata + .as_ref() + .map(|metadata| wkb_validation_columns(&self_clone.file_schema, metadata)) + .unwrap_or_default() + } else { + Vec::new() + }; - self_clone.inner.open(file)?.await + if !self_clone.options.validate || validation_columns.is_empty() { + return Ok(stream); + } + + let validated_stream = stream.map(move |batch_result| { + let batch = batch_result?; + validate_wkb_batch(&batch, &validation_columns)?; + Ok(batch) + }); + + Ok(Box::pin(validated_stream)) })) } } +fn wkb_validation_columns( + file_schema: &SchemaRef, + metadata: &GeoParquetMetadata, +) -> Vec<(usize, String)> { + file_schema + .fields() + .iter() + .enumerate() + .filter_map(|(column_index, field)| { + metadata + .columns + .get(field.name()) + .and_then(|column_metadata| { + if matches!(column_metadata.encoding, GeoParquetColumnEncoding::WKB) { + Some((column_index, field.name().clone())) + } else { + None + } + }) + }) + .collect() +} + +fn validate_wkb_batch(batch: &RecordBatch, validation_columns: &[(usize, String)]) -> Result<()> { + for (column_index, column_name) in validation_columns { + let column = batch.column(*column_index); + validate_wkb_array(column.as_ref(), column_name)?; + } + Ok(()) +} + +fn validate_wkb_array(array: &dyn Array, column_name: &str) -> Result<()> { + match array.data_type() { + DataType::Binary => { + let array = as_binary_array(array)?; + validate_wkb_values(array.iter(), column_name)?; + } + DataType::LargeBinary => { + let array = as_large_binary_array(array)?; + validate_wkb_values(array.iter(), column_name)?; + } + DataType::BinaryView => { + let array = as_binary_view_array(array)?; + validate_wkb_values(array.iter(), column_name)?; + } + other => { + return exec_err!( + "Expected Binary/LargeBinary/BinaryView storage for WKB validation in column '{}' but got {}", + column_name, + other + ); + } + } + + Ok(()) +} + +fn validate_wkb_values<'a>( + values: impl IntoIterator>, + column_name: &str, +) -> Result<()> { + for (row_index, maybe_wkb) in values.into_iter().enumerate() { + if let Some(wkb_bytes) = maybe_wkb { + if let Err(e) = wkb::reader::read_wkb(wkb_bytes) { + return exec_err!( + "WKB validation failed for column '{}' at row {}: {}", + column_name, + row_index, + e + ); + } + } + } + + Ok(()) +} + /// Filter an access plan using the GeoParquet file metadata /// /// Inspects the GeoParquetMetadata for a bbox at the column metadata level @@ -565,6 +675,9 @@ pub fn storage_schema_contains_geo(schema: &SchemaRef) -> bool { #[cfg(test)] mod test { + use std::sync::Arc; + + use arrow_array::{ArrayRef, BinaryArray, BinaryViewArray, Int64Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema}; use parquet::{ arrow::ArrowSchemaConverter, @@ -1199,6 +1312,54 @@ mod test { assert!(result.geometry_types().is_some()); } + #[test] + fn validate_wkb_array_binary() { + let valid_point_wkb: [u8; 21] = [ + 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf0, 0x3f, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, + ]; + + let valid_array: BinaryArray = [Some(valid_point_wkb.as_slice()), None].iter().collect(); + validate_wkb_array(&valid_array, "geom").unwrap(); + + let invalid_array: BinaryArray = [Some(&b"\x01"[..]), None].iter().collect(); + let err = validate_wkb_array(&invalid_array, "geom").unwrap_err(); + assert!(err.to_string().contains("WKB validation failed")); + } + + #[test] + fn validate_wkb_array_binary_view() { + let valid_point_wkb: [u8; 21] = [ + 0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf0, 0x3f, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, + ]; + + let valid_array: BinaryViewArray = + [Some(valid_point_wkb.as_slice()), None].iter().collect(); + validate_wkb_array(&valid_array, "geom").unwrap(); + + let invalid_array: BinaryViewArray = [Some(&b"\x01"[..]), None].iter().collect(); + let err = validate_wkb_array(&invalid_array, "geom").unwrap_err(); + assert!(err.to_string().contains("WKB validation failed")); + } + + #[test] + fn validate_wkb_batch_errors_on_invalid_wkb() { + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("geom", DataType::Binary, true), + ])); + + let id_column: ArrayRef = Arc::new(Int64Array::from(vec![Some(1)])); + let geom_array: BinaryArray = [Some(&b"\x01"[..])].iter().collect(); + let geom_column: ArrayRef = Arc::new(geom_array); + + let batch = RecordBatch::try_new(schema, vec![id_column, geom_column]).unwrap(); + let validation_columns = vec![(1, "geom".to_string())]; + let err = validate_wkb_batch(&batch, &validation_columns).unwrap_err(); + assert!(err.to_string().contains("WKB validation failed")); + } + fn file_schema_with_covering() -> SchemaRef { Arc::new(Schema::new(vec![ Field::new("not_geo", DataType::Binary, true), diff --git a/rust/sedona-geoparquet/src/format.rs b/rust/sedona-geoparquet/src/format.rs index 352d23a3e..b8d50cd6f 100644 --- a/rust/sedona-geoparquet/src/format.rs +++ b/rust/sedona-geoparquet/src/format.rs @@ -53,7 +53,7 @@ use sedona_schema::extension_type::ExtensionType; use crate::{ file_opener::{storage_schema_contains_geo, GeoParquetFileOpener, GeoParquetFileOpenerMetrics}, - metadata::{GeoParquetColumnEncoding, GeoParquetColumnMetadata, GeoParquetMetadata}, + metadata::{GeoParquetColumnEncoding, GeoParquetMetadata}, options::TableGeoParquetOptions, writer::create_geoparquet_writer_physical_plan, }; @@ -341,10 +341,11 @@ impl FileFormat for GeoParquetFormat { } fn file_source(&self) -> Arc { - Arc::new( + let mut source = GeoParquetFileSource::try_from_file_source(self.inner().file_source(), None, None) - .unwrap(), - ) + .unwrap(); + source.options = self.options.clone(); + Arc::new(source) } } @@ -361,7 +362,7 @@ pub struct GeoParquetFileSource { inner: ParquetSource, metadata_size_hint: Option, predicate: Option>, - overrides: Option>, + options: TableGeoParquetOptions, } impl GeoParquetFileSource { @@ -371,7 +372,7 @@ impl GeoParquetFileSource { inner: ParquetSource::new(options.inner.clone()), metadata_size_hint: None, predicate: None, - overrides: options.geometry_columns.clone(), + options, } } @@ -419,7 +420,9 @@ impl GeoParquetFileSource { inner: parquet_source.clone(), metadata_size_hint, predicate: new_predicate, - overrides: None, + options: TableGeoParquetOptions::from( + parquet_source.table_parquet_options().clone(), + ), }) } else { sedona_internal_err!("GeoParquetFileSource constructed from non-ParquetSource") @@ -432,7 +435,7 @@ impl GeoParquetFileSource { inner: self.inner.with_predicate(predicate.clone()), metadata_size_hint: self.metadata_size_hint, predicate: Some(predicate), - overrides: self.overrides.clone(), + options: self.options.clone(), } } @@ -457,7 +460,7 @@ impl GeoParquetFileSource { inner: parquet_source, metadata_size_hint: self.metadata_size_hint, predicate: self.predicate.clone(), - overrides: self.overrides.clone(), + options: self.options.clone(), } } @@ -467,7 +470,7 @@ impl GeoParquetFileSource { inner: self.inner.clone().with_metadata_size_hint(hint), metadata_size_hint: Some(hint), predicate: self.predicate.clone(), - overrides: self.overrides.clone(), + options: self.options.clone(), } } } @@ -483,8 +486,7 @@ impl FileSource for GeoParquetFileSource { self.inner .create_file_opener(object_store.clone(), base_config, partition); - // If there are no geo columns or no pruning predicate, just return the inner opener - if self.predicate.is_none() || !storage_schema_contains_geo(base_config.file_schema()) { + if !storage_schema_contains_geo(base_config.file_schema()) { return inner_opener; } @@ -492,13 +494,13 @@ impl FileSource for GeoParquetFileSource { inner: inner_opener, object_store, metadata_size_hint: self.metadata_size_hint, - predicate: self.predicate.clone().unwrap(), + predicate: self.predicate.clone(), file_schema: base_config.file_schema().clone(), enable_pruning: self.inner.table_parquet_options().global.pruning, // HACK: Since there is no public API to set inner's metrics, so we use // inner's metrics as the ExecutionPlan-global metrics metrics: GeoParquetFileOpenerMetrics::new(self.inner.metrics()), - overrides: self.overrides.clone(), + options: self.options.clone(), }) } @@ -516,8 +518,7 @@ impl FileSource for GeoParquetFileSource { // TODO should this be None? None, )?; - // TODO: part of try_from_file_source()? - updated_inner.overrides = self.overrides.clone(); + updated_inner.options = self.options.clone(); Ok(inner_result.with_updated_node(Arc::new(updated_inner))) } None => Ok(inner_result), @@ -529,35 +530,43 @@ impl FileSource for GeoParquetFileSource { } fn with_batch_size(&self, batch_size: usize) -> Arc { - Arc::new(Self::from_file_source( + let mut source = Self::from_file_source( self.inner.with_batch_size(batch_size), self.metadata_size_hint, self.predicate.clone(), - )) + ); + source.options = self.options.clone(); + Arc::new(source) } fn with_schema(&self, schema: TableSchema) -> Arc { - Arc::new(Self::from_file_source( + let mut source = Self::from_file_source( self.inner.with_schema(schema), self.metadata_size_hint, self.predicate.clone(), - )) + ); + source.options = self.options.clone(); + Arc::new(source) } fn with_projection(&self, config: &FileScanConfig) -> Arc { - Arc::new(Self::from_file_source( + let mut source = Self::from_file_source( self.inner.with_projection(config), self.metadata_size_hint, self.predicate.clone(), - )) + ); + source.options = self.options.clone(); + Arc::new(source) } fn with_statistics(&self, statistics: Statistics) -> Arc { - Arc::new(Self::from_file_source( + let mut source = Self::from_file_source( self.inner.with_statistics(statistics), self.metadata_size_hint, self.predicate.clone(), - )) + ); + source.options = self.options.clone(); + Arc::new(source) } fn metrics(&self) -> &ExecutionPlanMetricsSet { diff --git a/rust/sedona-geoparquet/src/options.rs b/rust/sedona-geoparquet/src/options.rs index eaa53dd5d..2cc1fd125 100644 --- a/rust/sedona-geoparquet/src/options.rs +++ b/rust/sedona-geoparquet/src/options.rs @@ -34,6 +34,8 @@ pub struct TableGeoParquetOptions { pub overwrite_bbox_columns: bool, /// Optional geometry column metadata overrides for schema inference. pub geometry_columns: Option>, + /// Validate geometry column contents against metadata when reading. + pub validate: bool, } impl TableGeoParquetOptions { diff --git a/rust/sedona-geoparquet/src/provider.rs b/rust/sedona-geoparquet/src/provider.rs index be2f5925d..b9d2ba520 100644 --- a/rust/sedona-geoparquet/src/provider.rs +++ b/rust/sedona-geoparquet/src/provider.rs @@ -84,6 +84,7 @@ pub struct GeoParquetReadOptions<'a> { inner: ParquetReadOptions<'a>, table_options: Option>, geometry_columns: Option>, + validate: bool, } impl GeoParquetReadOptions<'_> { @@ -189,6 +190,7 @@ impl GeoParquetReadOptions<'_> { inner: ParquetReadOptions::default(), table_options: Some(options), geometry_columns: None, + validate: false, }) } @@ -214,6 +216,17 @@ impl GeoParquetReadOptions<'_> { pub fn geometry_columns(&self) -> Option<&HashMap> { self.geometry_columns.as_ref() } + + /// Enable/disable geometry content validation. + pub fn with_validate(mut self, validate: bool) -> Self { + self.validate = validate; + self + } + + /// Get whether geometry content validation is enabled. + pub fn validate(&self) -> bool { + self.validate + } } fn parse_geometry_columns_json( @@ -252,6 +265,7 @@ impl ReadOptions<'_> for GeoParquetReadOptions<'_> { if let Some(geometry_columns) = &self.geometry_columns { geoparquet_options.geometry_columns = Some(geometry_columns.clone()); } + geoparquet_options.validate = self.validate; options.format = Arc::new(GeoParquetFormat::new(geoparquet_options)); return options; } diff --git a/rust/sedona-spatial-join/src/stream.rs b/rust/sedona-spatial-join/src/stream.rs index 177e7b0ff..9233e366d 100644 --- a/rust/sedona-spatial-join/src/stream.rs +++ b/rust/sedona-spatial-join/src/stream.rs @@ -1921,12 +1921,9 @@ mod tests { pos: 0, }; let mut produced_probe_indices: Vec = Vec::new(); - loop { - let Some((_, probe_indices)) = - progress.indices_for_next_batch(JoinSide::Left, join_type, max_batch_size) - else { - break; - }; + while let Some((_, probe_indices)) = + progress.indices_for_next_batch(JoinSide::Left, join_type, max_batch_size) + { let probe_indices = probe_indices.to_vec(); let adjust_range = progress.next_probe_range(&probe_indices); let build_indices = UInt64Array::from(vec![0; probe_indices.len()]);