diff --git a/rust/sedona-geoparquet/src/file_opener.rs b/rust/sedona-geoparquet/src/file_opener.rs index e41fd7c43..ae2a56d2a 100644 --- a/rust/sedona-geoparquet/src/file_opener.rs +++ b/rust/sedona-geoparquet/src/file_opener.rs @@ -24,7 +24,9 @@ use datafusion::datasource::{ use datafusion_common::Result; use datafusion_datasource_parquet::metadata::DFParquetMetadata; use datafusion_physical_expr::PhysicalExpr; -use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; +use datafusion_physical_plan::metrics::{ + ExecutionPlanMetricsSet, MetricBuilder, MetricType, MetricValue, PruningMetrics, +}; use object_store::ObjectStore; use parquet::{ basic::LogicalType, @@ -49,34 +51,40 @@ use crate::metadata::{GeoParquetColumnMetadata, GeoParquetMetadata}; #[derive(Clone)] pub(crate) struct GeoParquetFileOpenerMetrics { - /// How many file ranges are pruned by [`SpatialFilter`] + /// How many file ranges are pruned or matched by [`SpatialFilter`] /// /// Note on "file range": an opener may read only part of a file rather than the /// entire file; that portion is referred to as the "file range". See [`PartitionedFile`] /// for details. - files_ranges_spatial_pruned: Count, - /// How many file ranges are matched by [`SpatialFilter`] - files_ranges_spatial_matched: Count, - /// How many row groups are pruned by [`SpatialFilter`] + files_ranges_spatial_pruned: PruningMetrics, + /// How many row groups are pruned or matched by [`SpatialFilter`] /// /// Note: row groups skipped during the file-level pruning step are not counted /// again here. - row_groups_spatial_pruned: Count, - /// How many row groups are matched by [`SpatialFilter`] - row_groups_spatial_matched: Count, + row_groups_spatial_pruned: PruningMetrics, } impl GeoParquetFileOpenerMetrics { pub fn new(execution_plan_global_metrics: &ExecutionPlanMetricsSet) -> Self { + let files_ranges_spatial_pruned = PruningMetrics::new(); + MetricBuilder::new(execution_plan_global_metrics) + .with_type(MetricType::SUMMARY) + .build(MetricValue::PruningMetrics { + name: "files_ranges_spatial_pruned".into(), + pruning_metrics: files_ranges_spatial_pruned.clone(), + }); + + let row_groups_spatial_pruned = PruningMetrics::new(); + MetricBuilder::new(execution_plan_global_metrics) + .with_type(MetricType::SUMMARY) + .build(MetricValue::PruningMetrics { + name: "row_groups_spatial_pruned".into(), + pruning_metrics: row_groups_spatial_pruned.clone(), + }); + Self { - files_ranges_spatial_pruned: MetricBuilder::new(execution_plan_global_metrics) - .global_counter("files_ranges_spatial_pruned"), - files_ranges_spatial_matched: MetricBuilder::new(execution_plan_global_metrics) - .global_counter("files_ranges_spatial_matched"), - row_groups_spatial_pruned: MetricBuilder::new(execution_plan_global_metrics) - .global_counter("row_groups_spatial_pruned"), - row_groups_spatial_matched: MetricBuilder::new(execution_plan_global_metrics) - .global_counter("row_groups_spatial_matched"), + files_ranges_spatial_pruned, + row_groups_spatial_pruned, } } } @@ -171,12 +179,12 @@ fn filter_access_plan_using_geoparquet_file_metadata( let table_geo_stats = TableGeoStatistics::try_from_stats_and_schema(&column_geo_stats, file_schema)?; if !spatial_filter.evaluate(&table_geo_stats)? { - metrics.files_ranges_spatial_pruned.add(1); + metrics.files_ranges_spatial_pruned.add_pruned(1); for i in access_plan.row_group_indexes() { access_plan.skip(i); } } else { - metrics.files_ranges_spatial_matched.add(1); + metrics.files_ranges_spatial_pruned.add_matched(1); } Ok(()) @@ -206,8 +214,8 @@ fn filter_access_plan_using_geoparquet_covering( let covering_specs = parse_column_coverings(file_schema, parquet_metadata, metadata)?; // If there are no covering specs, don't iterate through the row groups - // This has the side-effect of ensuring the row_groups_spatial_matched metric is not double - // counted except in the rare case where we prune based on both. + // This has the side-effect of ensuring the row_groups_spatial_pruned matched count is not + // double counted except in the rare case where we prune based on both. if covering_specs.iter().all(|spec| spec.is_none()) { return Ok(()); } @@ -224,10 +232,10 @@ fn filter_access_plan_using_geoparquet_covering( // Evaluate predicate! if !spatial_filter.evaluate(&row_group_geo_stats)? { - metrics.row_groups_spatial_pruned.add(1); + metrics.row_groups_spatial_pruned.add_pruned(1); access_plan.skip(i); } else { - metrics.row_groups_spatial_matched.add(1); + metrics.row_groups_spatial_pruned.add_matched(1); } } @@ -259,7 +267,7 @@ fn filter_access_plan_using_native_geostats( // If there are no native geometry or geography logical types at the // top level indices, don't iterate through the row groups. This has the side-effect - // of ensuring the row_groups_spatial_matched metric is not double counted except + // of ensuring the row_groups_spatial_pruned matched count is not double counted except // in the rare case where we prune based on both. let parquet_schema = parquet_metadata.file_metadata().schema_descr(); if top_level_indices.iter().all(|i| { @@ -283,10 +291,10 @@ fn filter_access_plan_using_native_geostats( // Evaluate predicate! if !spatial_filter.evaluate(&row_group_geo_stats)? { - metrics.row_groups_spatial_pruned.add(1); + metrics.row_groups_spatial_pruned.add_pruned(1); access_plan.skip(i); } else { - metrics.row_groups_spatial_matched.add(1); + metrics.row_groups_spatial_pruned.add_matched(1); } } diff --git a/rust/sedona/tests/metrics.rs b/rust/sedona/tests/metrics.rs index 520fd8e55..6b9675dfb 100644 --- a/rust/sedona/tests/metrics.rs +++ b/rust/sedona/tests/metrics.rs @@ -56,10 +56,8 @@ async fn geo_parquet_metrics() { "#; let prune_plan = run_and_format(&ctx, prune_query).await; - assert!(prune_plan.contains("files_ranges_spatial_pruned=1")); - assert!(prune_plan.contains("files_ranges_spatial_matched=0")); - assert!(prune_plan.contains("row_groups_spatial_pruned=0")); - assert!(prune_plan.contains("row_groups_spatial_matched=0")); + assert!(prune_plan.contains("files_ranges_spatial_pruned=1 total \u{2192} 0 matched")); + assert!(prune_plan.contains("row_groups_spatial_pruned=0 total \u{2192} 0 matched")); // Test 2: query with spatial filter that can't skip any file or row group // ----------------------------------------------------------------------- @@ -79,10 +77,8 @@ async fn geo_parquet_metrics() { "#; let match_plan = run_and_format(&ctx, match_query).await; - assert!(match_plan.contains("files_ranges_spatial_pruned=0")); - assert!(match_plan.contains("files_ranges_spatial_matched=1")); - assert!(match_plan.contains("row_groups_spatial_pruned=0")); - assert!(match_plan.contains("row_groups_spatial_matched=1")); + assert!(match_plan.contains("files_ranges_spatial_pruned=1 total \u{2192} 1 matched")); + assert!(match_plan.contains("row_groups_spatial_pruned=1 total \u{2192} 1 matched")); } async fn run_and_format(ctx: &SedonaContext, sql: &str) -> String {