-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Validate parquet writer version #19515
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
eb3e843
a711910
11a8fe1
21cf45c
cf66a8e
c880680
5206884
87a27ae
6abc5ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,108 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| use std::fmt::{self, Display}; | ||
| use std::str::FromStr; | ||
|
|
||
| use crate::config::{ConfigField, Visit}; | ||
| use crate::error::{DataFusionError, Result}; | ||
|
|
||
| /// Parquet writer version options for controlling the Parquet file format version | ||
| /// | ||
| /// This enum validates parquet writer version values at configuration time, | ||
| /// ensuring only valid versions ("1.0" or "2.0") can be set via `SET` commands | ||
| /// or proto deserialization. | ||
| #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] | ||
| pub enum DFParquetWriterVersion { | ||
| /// Parquet format version 1.0 | ||
| #[default] | ||
| V1_0, | ||
| /// Parquet format version 2.0 | ||
| V2_0, | ||
| } | ||
|
|
||
| /// Implement parsing strings to `DFParquetWriterVersion` | ||
| impl FromStr for DFParquetWriterVersion { | ||
| type Err = DataFusionError; | ||
|
|
||
| fn from_str(s: &str) -> Result<Self, Self::Err> { | ||
| match s.to_lowercase().as_str() { | ||
| "1.0" => Ok(DFParquetWriterVersion::V1_0), | ||
| "2.0" => Ok(DFParquetWriterVersion::V2_0), | ||
| other => Err(DataFusionError::Configuration(format!( | ||
| "Invalid parquet writer version: {other}. Expected one of: 1.0, 2.0" | ||
| ))), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Display for DFParquetWriterVersion { | ||
| fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||
| let s = match self { | ||
| DFParquetWriterVersion::V1_0 => "1.0", | ||
| DFParquetWriterVersion::V2_0 => "2.0", | ||
| }; | ||
| write!(f, "{s}") | ||
| } | ||
| } | ||
|
|
||
| impl ConfigField for DFParquetWriterVersion { | ||
| fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) { | ||
| v.some(key, self, description) | ||
| } | ||
|
|
||
| fn set(&mut self, _: &str, value: &str) -> Result<()> { | ||
| *self = DFParquetWriterVersion::from_str(value)?; | ||
| Ok(()) | ||
| } | ||
| } | ||
|
|
||
| /// Convert `DFParquetWriterVersion` to parquet crate's `WriterVersion` | ||
| /// | ||
| /// This conversion is infallible since `DFParquetWriterVersion` only contains | ||
| /// valid values that have been validated at configuration time. | ||
| #[cfg(feature = "parquet")] | ||
| impl From<DFParquetWriterVersion> for parquet::file::properties::WriterVersion { | ||
| fn from(value: DFParquetWriterVersion) -> Self { | ||
| match value { | ||
| DFParquetWriterVersion::V1_0 => { | ||
| parquet::file::properties::WriterVersion::PARQUET_1_0 | ||
| } | ||
| DFParquetWriterVersion::V2_0 => { | ||
| parquet::file::properties::WriterVersion::PARQUET_2_0 | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Convert parquet crate's `WriterVersion` to `DFParquetWriterVersion` | ||
| /// | ||
| /// This is used when converting from existing parquet writer properties, | ||
| /// such as when reading from proto or test code. | ||
| #[cfg(feature = "parquet")] | ||
| impl From<parquet::file::properties::WriterVersion> for DFParquetWriterVersion { | ||
| fn from(version: parquet::file::properties::WriterVersion) -> Self { | ||
| match version { | ||
| parquet::file::properties::WriterVersion::PARQUET_1_0 => { | ||
| DFParquetWriterVersion::V1_0 | ||
| } | ||
| parquet::file::properties::WriterVersion::PARQUET_2_0 => { | ||
| DFParquetWriterVersion::V2_0 | ||
| } | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -381,7 +381,7 @@ mod parquet { | |
| force_filter_selections: global_options.global.force_filter_selections, | ||
| data_pagesize_limit: global_options.global.data_pagesize_limit as u64, | ||
| write_batch_size: global_options.global.write_batch_size as u64, | ||
| writer_version: global_options.global.writer_version.clone(), | ||
| writer_version: global_options.global.writer_version.to_string(), | ||
| compression_opt: global_options.global.compression.map(|compression| { | ||
| parquet_options::CompressionOpt::Compression(compression) | ||
| }), | ||
|
|
@@ -477,7 +477,10 @@ mod parquet { | |
| force_filter_selections: proto.force_filter_selections, | ||
| data_pagesize_limit: proto.data_pagesize_limit as usize, | ||
| write_batch_size: proto.write_batch_size as usize, | ||
| writer_version: proto.writer_version.clone(), | ||
| // TODO: Consider changing to TryFrom to avoid panic on invalid proto data | ||
| writer_version: proto.writer_version.parse().expect(" | ||
| Invalid parquet writer version in proto, expected '1.0' or '2.0' | ||
| "), | ||
|
Comment on lines
+481
to
+483
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We'll probably need to look into this in the future, to change this to a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I left a TODO comment for that now |
||
| compression: proto.compression_opt.as_ref().map(|opt| match opt { | ||
| parquet_options::CompressionOpt::Compression(compression) => compression.clone(), | ||
| }), | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor nit -- as a follow on PR it might be nice to put this module in
config/parquet.rsor something rather than make a new top level module in datafusion-commonUh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh yes it will get messy specially when we add the rest of the validations.
what do you think about adding a new module like
common/src/config_validations, and move to it theparquet_config.rsandformat.rs(implemented in #17549 ), and complete there the following validations that are to be added in the following PRs closing #17498