Skip to content

Commit 8a91db5

Browse files
adriangbmartin-g
andauthored
Move statistics handling into FileScanConfig (#18721)
## Summary This PR moves statistics handling from individual `FileSource` implementations into `FileScanConfig`, simplifying the `FileSource` trait interface. The `FileSource`s were all acting as a container for the statistics but never actually using them. Since `FileScanConfig` deals with file-level things (which the statistics are) it is better equipped to deal with it. ### Changes - **FileSource trait simplification**: Removed `statistics()`, `with_statistics()`, and `with_projection()` methods - **FileScanConfig enhancement**: Added `statistics` field and `statistics()` method - **FileSource implementations updated**: Removed `projected_statistics` field from all implementations: - ParquetSource - CsvSource - JsonSource - AvroSource - ArrowFileSource and ArrowStreamFileSource - MockSource (test utility) - **Test utilities**: Updated assertions to use `config.statistics()` instead of `file_source.statistics()` - **Proto serialization**: Updated to use `config.statistics()` ### Benefits 1. **Simpler trait interface**: `FileSource` implementations no longer need to manage statistics 2. **Centralized statistics**: All statistics are now managed consistently in `FileScanConfig` 3. **Cleaner API**: Statistics lifecycle is clearer and less error-prone 4. **Reduced code duplication**: Removes ~140 lines of boilerplate across implementations ### Related This is part of the projection refactoring work in #18627. This PR extracts just the statistics-related changes to make review easier. The full projection refactoring will come in subsequent PRs. ## Test plan - [x] All modified file source implementations compile - [x] Test utilities updated and compile - [x] CI tests pass (will verify after PR creation) 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Martin Grigorov <[email protected]>
1 parent 7d8b860 commit 8a91db5

File tree

13 files changed

+126
-171
lines changed

13 files changed

+126
-171
lines changed

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use arrow::datatypes::SchemaRef;
1919
use arrow::{array::RecordBatch, compute::concat_batches};
2020
use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr};
21-
use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics};
21+
use datafusion_common::{config::ConfigOptions, internal_err, Result};
2222
use datafusion_datasource::{
2323
file::FileSource, file_scan_config::FileScanConfig,
2424
file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture,
@@ -108,7 +108,6 @@ impl FileOpener for TestOpener {
108108
pub struct TestSource {
109109
support: bool,
110110
predicate: Option<Arc<dyn PhysicalExpr>>,
111-
statistics: Option<Statistics>,
112111
batch_size: Option<usize>,
113112
batches: Vec<RecordBatch>,
114113
schema: SchemaRef,
@@ -128,7 +127,6 @@ impl TestSource {
128127
metrics: ExecutionPlanMetricsSet::new(),
129128
batches,
130129
predicate: None,
131-
statistics: None,
132130
batch_size: None,
133131
projection: None,
134132
schema_adapter_factory: None,
@@ -175,25 +173,10 @@ impl FileSource for TestSource {
175173
})
176174
}
177175

178-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
179-
Arc::new(TestSource {
180-
statistics: Some(statistics),
181-
..self.clone()
182-
})
183-
}
184-
185176
fn metrics(&self) -> &ExecutionPlanMetricsSet {
186177
&self.metrics
187178
}
188179

189-
fn statistics(&self) -> Result<Statistics> {
190-
Ok(self
191-
.statistics
192-
.as_ref()
193-
.expect("statistics not set")
194-
.clone())
195-
}
196-
197180
fn file_type(&self) -> &str {
198181
"test"
199182
}

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,7 @@ pub(crate) fn parquet_exec_with_stats(file_size: u64) -> Arc<DataSourceExec> {
131131
.with_statistics(statistics)
132132
.build();
133133

134-
assert_eq!(
135-
config.file_source.statistics().unwrap().num_rows,
136-
Precision::Inexact(10000)
137-
);
134+
assert_eq!(config.statistics().num_rows, Precision::Inexact(10000));
138135
DataSourceExec::from_data_source(config)
139136
}
140137

datafusion/datasource-arrow/src/source.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ use datafusion_datasource::{as_file_source, TableSchema};
4040
use arrow::buffer::Buffer;
4141
use arrow::ipc::reader::{FileDecoder, FileReader, StreamReader};
4242
use datafusion_common::error::Result;
43-
use datafusion_common::{exec_datafusion_err, Statistics};
43+
use datafusion_common::exec_datafusion_err;
4444
use datafusion_datasource::file::FileSource;
4545
use datafusion_datasource::file_scan_config::FileScanConfig;
4646
use datafusion_datasource::PartitionedFile;
@@ -250,7 +250,6 @@ pub struct ArrowSource {
250250
format: ArrowFormat,
251251
table_schema: TableSchema,
252252
metrics: ExecutionPlanMetricsSet,
253-
projected_statistics: Option<Statistics>,
254253
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
255254
}
256255

@@ -261,7 +260,6 @@ impl ArrowSource {
261260
format: ArrowFormat::File,
262261
table_schema: table_schema.into(),
263262
metrics: ExecutionPlanMetricsSet::new(),
264-
projected_statistics: None,
265263
schema_adapter_factory: None,
266264
}
267265
}
@@ -272,7 +270,6 @@ impl ArrowSource {
272270
format: ArrowFormat::Stream,
273271
table_schema: table_schema.into(),
274272
metrics: ExecutionPlanMetricsSet::new(),
275-
projected_statistics: None,
276273
schema_adapter_factory: None,
277274
}
278275
}
@@ -305,12 +302,6 @@ impl FileSource for ArrowSource {
305302
Arc::new(Self { ..self.clone() })
306303
}
307304

308-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
309-
let mut conf = self.clone();
310-
conf.projected_statistics = Some(statistics);
311-
Arc::new(conf)
312-
}
313-
314305
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
315306
Arc::new(Self { ..self.clone() })
316307
}
@@ -319,13 +310,6 @@ impl FileSource for ArrowSource {
319310
&self.metrics
320311
}
321312

322-
fn statistics(&self) -> Result<Statistics> {
323-
let statistics = &self.projected_statistics;
324-
Ok(statistics
325-
.clone()
326-
.expect("projected_statistics must be set"))
327-
}
328-
329313
fn file_type(&self) -> &str {
330314
match self.format {
331315
ArrowFormat::File => "arrow",

datafusion/datasource-avro/src/source.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use std::sync::Arc;
2323
use crate::avro_to_arrow::Reader as AvroReader;
2424

2525
use datafusion_common::error::Result;
26-
use datafusion_common::Statistics;
2726
use datafusion_datasource::file::FileSource;
2827
use datafusion_datasource::file_scan_config::FileScanConfig;
2928
use datafusion_datasource::file_stream::FileOpener;
@@ -41,7 +40,6 @@ pub struct AvroSource {
4140
batch_size: Option<usize>,
4241
projection: Option<Vec<String>>,
4342
metrics: ExecutionPlanMetricsSet,
44-
projected_statistics: Option<Statistics>,
4543
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
4644
}
4745

@@ -53,7 +51,6 @@ impl AvroSource {
5351
batch_size: None,
5452
projection: None,
5553
metrics: ExecutionPlanMetricsSet::new(),
56-
projected_statistics: None,
5754
schema_adapter_factory: None,
5855
}
5956
}
@@ -95,12 +92,6 @@ impl FileSource for AvroSource {
9592
Arc::new(conf)
9693
}
9794

98-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
99-
let mut conf = self.clone();
100-
conf.projected_statistics = Some(statistics);
101-
Arc::new(conf)
102-
}
103-
10495
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
10596
let mut conf = self.clone();
10697
conf.projection = config.projected_file_column_names();
@@ -111,13 +102,6 @@ impl FileSource for AvroSource {
111102
&self.metrics
112103
}
113104

114-
fn statistics(&self) -> Result<Statistics> {
115-
let statistics = &self.projected_statistics;
116-
Ok(statistics
117-
.clone()
118-
.expect("projected_statistics must be set"))
119-
}
120-
121105
fn file_type(&self) -> &str {
122106
"avro"
123107
}

datafusion/datasource-csv/src/source.rs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion_datasource::{
3434

3535
use arrow::csv;
3636
use datafusion_common::config::CsvOptions;
37-
use datafusion_common::{DataFusionError, Result, Statistics};
37+
use datafusion_common::{DataFusionError, Result};
3838
use datafusion_common_runtime::JoinSet;
3939
use datafusion_datasource::file::FileSource;
4040
use datafusion_datasource::file_scan_config::FileScanConfig;
@@ -90,7 +90,6 @@ pub struct CsvSource {
9090
table_schema: TableSchema,
9191
file_projection: Option<Vec<usize>>,
9292
metrics: ExecutionPlanMetricsSet,
93-
projected_statistics: Option<Statistics>,
9493
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
9594
}
9695

@@ -103,7 +102,6 @@ impl CsvSource {
103102
batch_size: None,
104103
file_projection: None,
105104
metrics: ExecutionPlanMetricsSet::new(),
106-
projected_statistics: None,
107105
schema_adapter_factory: None,
108106
}
109107
}
@@ -266,12 +264,6 @@ impl FileSource for CsvSource {
266264
Arc::new(conf)
267265
}
268266

269-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
270-
let mut conf = self.clone();
271-
conf.projected_statistics = Some(statistics);
272-
Arc::new(conf)
273-
}
274-
275267
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
276268
let mut conf = self.clone();
277269
conf.file_projection = config.file_column_projection_indices();
@@ -281,12 +273,7 @@ impl FileSource for CsvSource {
281273
fn metrics(&self) -> &ExecutionPlanMetricsSet {
282274
&self.metrics
283275
}
284-
fn statistics(&self) -> Result<Statistics> {
285-
let statistics = &self.projected_statistics;
286-
Ok(statistics
287-
.clone()
288-
.expect("projected_statistics must be set"))
289-
}
276+
290277
fn file_type(&self) -> &str {
291278
"csv"
292279
}

datafusion/datasource-json/src/source.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
3737

3838
use arrow::json::ReaderBuilder;
3939
use arrow::{datatypes::SchemaRef, json};
40-
use datafusion_common::Statistics;
4140
use datafusion_datasource::file::FileSource;
4241
use datafusion_datasource::file_scan_config::FileScanConfig;
4342
use datafusion_execution::TaskContext;
@@ -79,7 +78,6 @@ pub struct JsonSource {
7978
table_schema: datafusion_datasource::TableSchema,
8079
batch_size: Option<usize>,
8180
metrics: ExecutionPlanMetricsSet,
82-
projected_statistics: Option<Statistics>,
8381
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
8482
}
8583

@@ -90,7 +88,6 @@ impl JsonSource {
9088
table_schema: table_schema.into(),
9189
batch_size: None,
9290
metrics: ExecutionPlanMetricsSet::new(),
93-
projected_statistics: None,
9491
schema_adapter_factory: None,
9592
}
9693
}
@@ -133,12 +130,6 @@ impl FileSource for JsonSource {
133130
Arc::new(conf)
134131
}
135132

136-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
137-
let mut conf = self.clone();
138-
conf.projected_statistics = Some(statistics);
139-
Arc::new(conf)
140-
}
141-
142133
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
143134
Arc::new(Self { ..self.clone() })
144135
}
@@ -147,13 +138,6 @@ impl FileSource for JsonSource {
147138
&self.metrics
148139
}
149140

150-
fn statistics(&self) -> Result<Statistics> {
151-
let statistics = &self.projected_statistics;
152-
Ok(statistics
153-
.clone()
154-
.expect("projected_statistics must be set to call"))
155-
}
156-
157141
fn file_type(&self) -> &str {
158142
"json"
159143
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use datafusion_datasource::schema_adapter::{
3737

3838
use arrow::datatypes::TimeUnit;
3939
use datafusion_common::config::TableParquetOptions;
40-
use datafusion_common::{DataFusionError, Statistics};
40+
use datafusion_common::DataFusionError;
4141
use datafusion_datasource::file::FileSource;
4242
use datafusion_datasource::file_scan_config::FileScanConfig;
4343
use datafusion_datasource::TableSchema;
@@ -286,7 +286,6 @@ pub struct ParquetSource {
286286
pub(crate) batch_size: Option<usize>,
287287
/// Optional hint for the size of the parquet metadata
288288
pub(crate) metadata_size_hint: Option<usize>,
289-
pub(crate) projected_statistics: Option<Statistics>,
290289
#[cfg(feature = "parquet_encryption")]
291290
pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
292291
}
@@ -307,7 +306,6 @@ impl ParquetSource {
307306
schema_adapter_factory: None,
308307
batch_size: None,
309308
metadata_size_hint: None,
310-
projected_statistics: None,
311309
#[cfg(feature = "parquet_encryption")]
312310
encryption_factory: None,
313311
}
@@ -625,12 +623,6 @@ impl FileSource for ParquetSource {
625623
Arc::new(conf)
626624
}
627625

628-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
629-
let mut conf = self.clone();
630-
conf.projected_statistics = Some(statistics);
631-
Arc::new(conf)
632-
}
633-
634626
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
635627
Arc::new(Self { ..self.clone() })
636628
}
@@ -639,23 +631,6 @@ impl FileSource for ParquetSource {
639631
&self.metrics
640632
}
641633

642-
fn statistics(&self) -> datafusion_common::Result<Statistics> {
643-
let statistics = &self.projected_statistics;
644-
let statistics = statistics
645-
.clone()
646-
.expect("projected_statistics must be set");
647-
// When filters are pushed down, we have no way of knowing the exact statistics.
648-
// Note that pruning predicate is also a kind of filter pushdown.
649-
// (bloom filters use `pruning_predicate` too).
650-
// Because filter pushdown may happen dynamically as long as there is a predicate
651-
// if we have *any* predicate applied, we can't guarantee the statistics are exact.
652-
if self.filter().is_some() {
653-
Ok(statistics.to_inexact())
654-
} else {
655-
Ok(statistics)
656-
}
657-
}
658-
659634
fn file_type(&self) -> &str {
660635
"parquet"
661636
}

datafusion/datasource/src/file.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::file_scan_config::FileScanConfig;
2727
use crate::file_stream::FileOpener;
2828
use crate::schema_adapter::SchemaAdapterFactory;
2929
use datafusion_common::config::ConfigOptions;
30-
use datafusion_common::{not_impl_err, Result, Statistics};
30+
use datafusion_common::{not_impl_err, Result};
3131
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
3232
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
3333
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -68,16 +68,12 @@ pub trait FileSource: Send + Sync {
6868
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
6969
/// Initialize new instance with projection information
7070
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
71-
/// Initialize new instance with projected statistics
72-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;
7371
/// Returns the filter expression that will be applied during the file scan.
7472
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
7573
None
7674
}
7775
/// Return execution plan metrics
7876
fn metrics(&self) -> &ExecutionPlanMetricsSet;
79-
/// Return projected statistics
80-
fn statistics(&self) -> Result<Statistics>;
8177
/// String representation of file source such as "csv", "json", "parquet"
8278
fn file_type(&self) -> &str;
8379
/// Format FileType specific information

0 commit comments

Comments
 (0)