Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 34 additions & 26 deletions rust/sedona-geoparquet/src/file_opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use datafusion::datasource::{
use datafusion_common::Result;
use datafusion_datasource_parquet::metadata::DFParquetMetadata;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder};
use datafusion_physical_plan::metrics::{
ExecutionPlanMetricsSet, MetricBuilder, MetricType, MetricValue, PruningMetrics,
};
use object_store::ObjectStore;
use parquet::{
basic::LogicalType,
Expand All @@ -49,34 +51,40 @@ use crate::metadata::{GeoParquetColumnMetadata, GeoParquetMetadata};

#[derive(Clone)]
pub(crate) struct GeoParquetFileOpenerMetrics {
/// How many file ranges are pruned by [`SpatialFilter`]
/// How many file ranges are pruned or matched by [`SpatialFilter`]
///
/// Note on "file range": an opener may read only part of a file rather than the
/// entire file; that portion is referred to as the "file range". See [`PartitionedFile`]
/// for details.
files_ranges_spatial_pruned: Count,
/// How many file ranges are matched by [`SpatialFilter`]
files_ranges_spatial_matched: Count,
/// How many row groups are pruned by [`SpatialFilter`]
files_ranges_spatial_pruned: PruningMetrics,
/// How many row groups are pruned or matched by [`SpatialFilter`]
///
/// Note: row groups skipped during the file-level pruning step are not counted
/// again here.
row_groups_spatial_pruned: Count,
/// How many row groups are matched by [`SpatialFilter`]
row_groups_spatial_matched: Count,
row_groups_spatial_pruned: PruningMetrics,
}

impl GeoParquetFileOpenerMetrics {
pub fn new(execution_plan_global_metrics: &ExecutionPlanMetricsSet) -> Self {
let files_ranges_spatial_pruned = PruningMetrics::new();
MetricBuilder::new(execution_plan_global_metrics)
.with_type(MetricType::SUMMARY)
.build(MetricValue::PruningMetrics {
name: "files_ranges_spatial_pruned".into(),
pruning_metrics: files_ranges_spatial_pruned.clone(),
});

let row_groups_spatial_pruned = PruningMetrics::new();
MetricBuilder::new(execution_plan_global_metrics)
.with_type(MetricType::SUMMARY)
.build(MetricValue::PruningMetrics {
name: "row_groups_spatial_pruned".into(),
pruning_metrics: row_groups_spatial_pruned.clone(),
});

Self {
files_ranges_spatial_pruned: MetricBuilder::new(execution_plan_global_metrics)
.global_counter("files_ranges_spatial_pruned"),
files_ranges_spatial_matched: MetricBuilder::new(execution_plan_global_metrics)
.global_counter("files_ranges_spatial_matched"),
row_groups_spatial_pruned: MetricBuilder::new(execution_plan_global_metrics)
.global_counter("row_groups_spatial_pruned"),
row_groups_spatial_matched: MetricBuilder::new(execution_plan_global_metrics)
.global_counter("row_groups_spatial_matched"),
files_ranges_spatial_pruned,
row_groups_spatial_pruned,
}
}
}
Expand Down Expand Up @@ -171,12 +179,12 @@ fn filter_access_plan_using_geoparquet_file_metadata(
let table_geo_stats =
TableGeoStatistics::try_from_stats_and_schema(&column_geo_stats, file_schema)?;
if !spatial_filter.evaluate(&table_geo_stats)? {
metrics.files_ranges_spatial_pruned.add(1);
metrics.files_ranges_spatial_pruned.add_pruned(1);
for i in access_plan.row_group_indexes() {
access_plan.skip(i);
}
} else {
metrics.files_ranges_spatial_matched.add(1);
metrics.files_ranges_spatial_pruned.add_matched(1);
}

Ok(())
Expand Down Expand Up @@ -206,8 +214,8 @@ fn filter_access_plan_using_geoparquet_covering(
let covering_specs = parse_column_coverings(file_schema, parquet_metadata, metadata)?;

// If there are no covering specs, don't iterate through the row groups
// This has the side-effect of ensuring the row_groups_spatial_matched metric is not double
// counted except in the rare case where we prune based on both.
// This has the side-effect of ensuring the row_groups_spatial_pruned matched count is not
// double counted except in the rare case where we prune based on both.
if covering_specs.iter().all(|spec| spec.is_none()) {
return Ok(());
}
Expand All @@ -224,10 +232,10 @@ fn filter_access_plan_using_geoparquet_covering(

// Evaluate predicate!
if !spatial_filter.evaluate(&row_group_geo_stats)? {
metrics.row_groups_spatial_pruned.add(1);
metrics.row_groups_spatial_pruned.add_pruned(1);
access_plan.skip(i);
} else {
metrics.row_groups_spatial_matched.add(1);
metrics.row_groups_spatial_pruned.add_matched(1);
}
}

Expand Down Expand Up @@ -259,7 +267,7 @@ fn filter_access_plan_using_native_geostats(

// If there are no native geometry or geography logical types at the
// top level indices, don't iterate through the row groups. This has the side-effect
// of ensuring the row_groups_spatial_matched metric is not double counted except
// of ensuring the row_groups_spatial_pruned matched count is not double counted except
// in the rare case where we prune based on both.
let parquet_schema = parquet_metadata.file_metadata().schema_descr();
if top_level_indices.iter().all(|i| {
Expand All @@ -283,10 +291,10 @@ fn filter_access_plan_using_native_geostats(

// Evaluate predicate!
if !spatial_filter.evaluate(&row_group_geo_stats)? {
metrics.row_groups_spatial_pruned.add(1);
metrics.row_groups_spatial_pruned.add_pruned(1);
access_plan.skip(i);
} else {
metrics.row_groups_spatial_matched.add(1);
metrics.row_groups_spatial_pruned.add_matched(1);
}
}

Expand Down
12 changes: 4 additions & 8 deletions rust/sedona/tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ async fn geo_parquet_metrics() {
"#;

let prune_plan = run_and_format(&ctx, prune_query).await;
assert!(prune_plan.contains("files_ranges_spatial_pruned=1"));
assert!(prune_plan.contains("files_ranges_spatial_matched=0"));
assert!(prune_plan.contains("row_groups_spatial_pruned=0"));
assert!(prune_plan.contains("row_groups_spatial_matched=0"));
assert!(prune_plan.contains("files_ranges_spatial_pruned=1 total \u{2192} 0 matched"));
assert!(prune_plan.contains("row_groups_spatial_pruned=0 total \u{2192} 0 matched"));

// Test 2: query with spatial filter that can't skip any file or row group
// -----------------------------------------------------------------------
Expand All @@ -79,10 +77,8 @@ async fn geo_parquet_metrics() {
"#;

let match_plan = run_and_format(&ctx, match_query).await;
assert!(match_plan.contains("files_ranges_spatial_pruned=0"));
assert!(match_plan.contains("files_ranges_spatial_matched=1"));
assert!(match_plan.contains("row_groups_spatial_pruned=0"));
assert!(match_plan.contains("row_groups_spatial_matched=1"));
assert!(match_plan.contains("files_ranges_spatial_pruned=1 total \u{2192} 1 matched"));
assert!(match_plan.contains("row_groups_spatial_pruned=1 total \u{2192} 1 matched"));
}

async fn run_and_format(ctx: &SedonaContext, sql: &str) -> String {
Expand Down