Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,7 @@ pub(crate) mod test_util {
.infer_stats(state, &store, file_schema.clone(), &meta)
.await?;

let file_groups = vec![
vec![PartitionedFile {
object_meta: meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
}]
.into(),
];
let file_groups = vec![vec![PartitionedFile::new_from_meta(meta)].into()];

let exec = format
.create_physical_plan(
Expand Down
9 changes: 1 addition & 8 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,7 @@ mod tests {
version: None,
};

let partitioned_file = PartitionedFile {
object_meta: meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
};
let partitioned_file = PartitionedFile::new_from_meta(meta);

let f1 = Field::new("id", DataType::Int32, true);
let f2 = Field::new("extra_column", DataType::Utf8, true);
Expand Down
84 changes: 25 additions & 59 deletions datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ mod tests {
use datafusion_datasource::source::DataSourceExec;

use datafusion_datasource::file::FileSource;
use datafusion_datasource::{FileRange, PartitionedFile, TableSchema};
use datafusion_datasource::{PartitionedFile, TableSchema};
use datafusion_datasource_parquet::source::ParquetSource;
use datafusion_datasource_parquet::{
DefaultParquetFileReaderFactory, ParquetFileReaderFactory, ParquetFormat,
Expand Down Expand Up @@ -1527,14 +1527,7 @@ mod tests {
#[tokio::test]
async fn parquet_exec_with_range() -> Result<()> {
fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile {
PartitionedFile {
object_meta: meta.clone(),
partition_values: vec![],
range: Some(FileRange { start, end }),
statistics: None,
extensions: None,
metadata_size_hint: None,
}
PartitionedFile::new_from_meta(meta.clone()).with_range(start, end)
}

async fn assert_parquet_read(
Expand Down Expand Up @@ -1616,21 +1609,15 @@ mod tests {
.await
.unwrap();

let partitioned_file = PartitionedFile {
object_meta: meta,
partition_values: vec![
let partitioned_file = PartitionedFile::new_from_meta(meta)
.with_partition_values(vec![
ScalarValue::from("2021"),
ScalarValue::UInt8(Some(10)),
ScalarValue::Dictionary(
Box::new(DataType::UInt16),
Box::new(ScalarValue::from("26")),
),
],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
};
]);

let expected_schema = Schema::new(vec![
Field::new("id", DataType::Int32, true),
Expand Down Expand Up @@ -1711,20 +1698,13 @@ mod tests {
.unwrap()
.child("invalid.parquet");

let partitioned_file = PartitionedFile {
object_meta: ObjectMeta {
location,
last_modified: Utc.timestamp_nanos(0),
size: 1337,
e_tag: None,
version: None,
},
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
};
let partitioned_file = PartitionedFile::new_from_meta(ObjectMeta {
location,
last_modified: Utc.timestamp_nanos(0),
size: 1337,
e_tag: None,
version: None,
});

let file_schema = Arc::new(Schema::empty());
let config = FileScanConfigBuilder::new(
Expand Down Expand Up @@ -2376,36 +2356,22 @@ mod tests {
);
let config = FileScanConfigBuilder::new(store_url, source)
.with_file(
PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(name_1),
last_modified: Utc::now(),
size: total_size_1,
e_tag: None,
version: None,
},
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
}
.with_metadata_size_hint(123),
)
.with_file(PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(name_2),
PartitionedFile::new_from_meta(ObjectMeta {
location: Path::from(name_1),
last_modified: Utc::now(),
size: total_size_2,
size: total_size_1,
e_tag: None,
version: None,
},
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
})
})
.with_metadata_size_hint(123),
)
.with_file(PartitionedFile::new_from_meta(ObjectMeta {
location: Path::from(name_2),
last_modified: Utc::now(),
size: total_size_2,
e_tag: None,
version: None,
}))
.build();

let exec = DataSourceExec::from_data_source(config);
Expand Down
12 changes: 2 additions & 10 deletions datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,8 @@ impl TestParquetFile {
.with_table_parquet_options(parquet_options.clone()),
);
let scan_config_builder =
FileScanConfigBuilder::new(self.object_store_url.clone(), source).with_file(
PartitionedFile {
object_meta: self.object_meta.clone(),
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
},
);
FileScanConfigBuilder::new(self.object_store_url.clone(), source)
.with_file(PartitionedFile::new_from_meta(self.object_meta.clone()));

let df_schema = Arc::clone(&self.schema).to_dfschema_ref()?;

Expand Down
10 changes: 3 additions & 7 deletions datafusion/core/tests/parquet/custom_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,9 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() {
store_parquet_in_memory(vec![batch]).await;
let file_group = parquet_files_meta
.into_iter()
.map(|meta| PartitionedFile {
object_meta: meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: Some(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA))),
metadata_size_hint: None,
.map(|meta| {
PartitionedFile::new_from_meta(meta)
.with_extensions(Arc::new(String::from(EXPECTED_USER_DEFINED_METADATA)))
})
.collect();

Expand Down
9 changes: 1 addition & 8 deletions datafusion/core/tests/parquet/page_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,7 @@ async fn get_parquet_exec(
.await
.unwrap();

let partitioned_file = PartitionedFile {
object_meta: meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
};
let partitioned_file = PartitionedFile::new_from_meta(meta);

let df_schema = schema.clone().to_dfschema().unwrap();
let execution_props = ExecutionProps::new();
Expand Down
9 changes: 1 addition & 8 deletions datafusion/datasource-parquet/src/row_group_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1559,14 +1559,7 @@ mod tests {
ParquetObjectReader::new(Arc::new(in_memory), object_meta.location.clone())
.with_file_size(object_meta.size);

let partitioned_file = PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
};
let partitioned_file = PartitionedFile::new_from_meta(object_meta);

let reader = ParquetFileReader {
inner,
Expand Down
9 changes: 1 addition & 8 deletions datafusion/datasource/src/display.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,6 @@ mod tests {
version: None,
};

PartitionedFile {
object_meta,
partition_values: vec![],
range: None,
statistics: None,
extensions: None,
metadata_size_hint: None,
}
PartitionedFile::new_from_meta(object_meta)
}
}
71 changes: 34 additions & 37 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1661,43 +1661,40 @@ mod tests {

impl From<File> for PartitionedFile {
fn from(file: File) -> Self {
PartitionedFile {
object_meta: ObjectMeta {
location: Path::from(format!(
"data/date={}/{}.parquet",
file.date, file.name
)),
last_modified: chrono::Utc.timestamp_nanos(0),
size: 0,
e_tag: None,
version: None,
},
partition_values: vec![ScalarValue::from(file.date)],
range: None,
statistics: Some(Arc::new(Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: file
.statistics
.into_iter()
.map(|stats| {
stats
.map(|(min, max)| ColumnStatistics {
min_value: Precision::Exact(
ScalarValue::Float64(min),
),
max_value: Precision::Exact(
ScalarValue::Float64(max),
),
..Default::default()
})
.unwrap_or_default()
})
.collect::<Vec<_>>(),
})),
extensions: None,
metadata_size_hint: None,
}
let object_meta = ObjectMeta {
location: Path::from(format!(
"data/date={}/{}.parquet",
file.date, file.name
)),
last_modified: chrono::Utc.timestamp_nanos(0),
size: 0,
e_tag: None,
version: None,
};
let statistics = Arc::new(Statistics {
num_rows: Precision::Absent,
total_byte_size: Precision::Absent,
column_statistics: file
.statistics
.into_iter()
.map(|stats| {
stats
.map(|(min, max)| ColumnStatistics {
min_value: Precision::Exact(ScalarValue::Float64(
min,
)),
max_value: Precision::Exact(ScalarValue::Float64(
max,
)),
..Default::default()
})
.unwrap_or_default()
})
.collect::<Vec<_>>(),
});
PartitionedFile::new_from_meta(object_meta)
.with_partition_values(vec![ScalarValue::from(file.date)])
.with_statistics(statistics)
}
}
}
Expand Down
Loading