From c8c789e1b68a85bf0792aadedc247722fdf3a2f0 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 25 Mar 2026 21:57:17 +0000 Subject: [PATCH 1/5] feat!: Enhance compression codec enum. - Add optional compression level to gzip and zstd (needed for when avro compression usage). - Add Snappy as a compression codec (also will be used for Avro) --- .../iceberg/src/catalog/metadata_location.rs | 12 +- crates/iceberg/src/compression.rs | 107 ++++++++++++++---- crates/iceberg/src/puffin/metadata.rs | 2 +- crates/iceberg/src/puffin/mod.rs | 43 +++---- crates/iceberg/src/puffin/reader.rs | 4 +- crates/iceberg/src/puffin/test_utils.rs | 4 +- crates/iceberg/src/puffin/writer.rs | 8 +- crates/iceberg/src/spec/table_metadata.rs | 6 +- crates/iceberg/src/spec/table_properties.rs | 16 +-- 9 files changed, 136 insertions(+), 66 deletions(-) diff --git a/crates/iceberg/src/catalog/metadata_location.rs b/crates/iceberg/src/catalog/metadata_location.rs index ed28118879..18b795408b 100644 --- a/crates/iceberg/src/catalog/metadata_location.rs +++ b/crates/iceberg/src/catalog/metadata_location.rs @@ -114,9 +114,9 @@ impl MetadataLocation { ))?; // Check for compression suffix (e.g., .gz) - let gzip_suffix = CompressionCodec::Gzip.suffix()?; + let gzip_suffix = CompressionCodec::Gzip(None).suffix()?; let (stripped, compression_codec) = if let Some(s) = stripped.strip_suffix(gzip_suffix) { - (s, CompressionCodec::Gzip) + (s, CompressionCodec::Gzip(None)) } else { (stripped, CompressionCodec::None) }; @@ -261,7 +261,7 @@ mod test { table_location: "/abc".to_string(), version: 1234567, id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), - compression_codec: CompressionCodec::Gzip, + compression_codec: CompressionCodec::Gzip(None), }), ), // Negative version @@ -345,10 +345,10 @@ mod test { "/test/table/metadata/00005-81056704-ce5b-41c4-bb83-eb6408081af6.gz.metadata.json", ) .unwrap(); - assert_eq!(location_gzip.compression_codec, CompressionCodec::Gzip); + assert_eq!(location_gzip.compression_codec, CompressionCodec::Gzip(None)); let next_gzip = location_gzip.with_next_version(); - assert_eq!(next_gzip.compression_codec, CompressionCodec::Gzip); + assert_eq!(next_gzip.compression_codec, CompressionCodec::Gzip(None)); assert_eq!(next_gzip.version, 6); } @@ -369,7 +369,7 @@ mod test { ); let metadata_gzip = create_test_metadata(props_gzip); let updated_gzip = location.with_new_metadata(&metadata_gzip); - assert_eq!(updated_gzip.compression_codec, CompressionCodec::Gzip); + assert_eq!(updated_gzip.compression_codec, CompressionCodec::Gzip(None)); assert_eq!(updated_gzip.version, 0); assert_eq!( updated_gzip.to_string(), diff --git a/crates/iceberg/src/compression.rs b/crates/iceberg/src/compression.rs index 42f5298437..fb7fffc5b5 100644 --- a/crates/iceberg/src/compression.rs +++ b/crates/iceberg/src/compression.rs @@ -17,28 +17,74 @@ //! Compression codec support for data compression and decompression. +use std::fmt; use std::io::{Read, Write}; use flate2::Compression; use flate2::read::GzDecoder; use flate2::write::GzEncoder; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::{Error, ErrorKind, Result}; /// Data compression formats -#[derive(Debug, PartialEq, Eq, Clone, Copy, Default, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] +#[derive(Debug, PartialEq, Eq, Clone, Copy, Default)] pub enum CompressionCodec { #[default] /// No compression None, /// LZ4 single compression frame with content size present Lz4, - /// Zstandard single compression frame with content size present - Zstd, - /// Gzip compression - Gzip, + /// Zstandard single compression frame with content size present. Optional level 0–22. + Zstd(Option), + /// Gzip compression. Optional level 0–9. + Gzip(Option), + /// Snappy compression + Snappy, +} + +impl Serialize for CompressionCodec { + fn serialize(&self, serializer: S) -> std::result::Result { + let s = match self { + CompressionCodec::None => "none", + CompressionCodec::Lz4 => "lz4", + CompressionCodec::Zstd(_) => "zstd", + CompressionCodec::Gzip(_) => "gzip", + CompressionCodec::Snappy => "snappy", + }; + serializer.serialize_str(s) + } +} + +impl<'de> Deserialize<'de> for CompressionCodec { + fn deserialize>(deserializer: D) -> std::result::Result { + let s = String::deserialize(deserializer)?; + match s.to_lowercase().as_str() { + "none" => Ok(CompressionCodec::None), + "lz4" => Ok(CompressionCodec::Lz4), + "zstd" => Ok(CompressionCodec::Zstd(None)), + "gzip" => Ok(CompressionCodec::Gzip(None)), + "snappy" => Ok(CompressionCodec::Snappy), + other => Err(serde::de::Error::unknown_variant( + other, + &["none", "lz4", "zstd", "gzip", "snappy"], + )), + } + } +} + +impl fmt::Display for CompressionCodec { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + CompressionCodec::None => write!(f, "none"), + CompressionCodec::Lz4 => write!(f, "lz4"), + CompressionCodec::Zstd(Some(level)) => write!(f, "zstd(level={level})"), + CompressionCodec::Zstd(None) => write!(f, "zstd"), + CompressionCodec::Gzip(Some(level)) => write!(f, "gzip(level={level})"), + CompressionCodec::Gzip(None) => write!(f, "gzip"), + CompressionCodec::Snappy => write!(f, "snappy"), + } + } } impl CompressionCodec { @@ -49,13 +95,17 @@ impl CompressionCodec { ErrorKind::FeatureUnsupported, "LZ4 decompression is not supported currently", )), - CompressionCodec::Zstd => Ok(zstd::stream::decode_all(&bytes[..])?), - CompressionCodec::Gzip => { + CompressionCodec::Zstd(_) => Ok(zstd::stream::decode_all(&bytes[..])?), + CompressionCodec::Gzip(_) => { let mut decoder = GzDecoder::new(&bytes[..]); let mut decompressed = Vec::new(); decoder.read_to_end(&mut decompressed)?; Ok(decompressed) } + CompressionCodec::Snappy => Err(Error::new( + ErrorKind::FeatureUnsupported, + "Snappy decompression is not supported currently", + )), } } @@ -66,19 +116,24 @@ impl CompressionCodec { ErrorKind::FeatureUnsupported, "LZ4 compression is not supported currently", )), - CompressionCodec::Zstd => { + CompressionCodec::Zstd(level) => { let writer = Vec::::new(); - let mut encoder = zstd::stream::Encoder::new(writer, 3)?; + let mut encoder = zstd::stream::Encoder::new(writer, level.unwrap_or(3) as i32)?; encoder.include_checksum(true)?; encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?; std::io::copy(&mut &bytes[..], &mut encoder)?; Ok(encoder.finish()?) } - CompressionCodec::Gzip => { - let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + CompressionCodec::Gzip(level) => { + let compression = Compression::new(level.unwrap_or(6).min(9) as u32); + let mut encoder = GzEncoder::new(Vec::new(), compression); encoder.write_all(&bytes)?; Ok(encoder.finish()?) } + CompressionCodec::Snappy => Err(Error::new( + ErrorKind::FeatureUnsupported, + "Snappy compression is not supported currently", + )), } } @@ -95,8 +150,8 @@ impl CompressionCodec { pub fn suffix(&self) -> Result<&'static str> { match self { CompressionCodec::None => Ok(""), - CompressionCodec::Gzip => Ok(".gz"), - codec @ (CompressionCodec::Lz4 | CompressionCodec::Zstd) => Err(Error::new( + CompressionCodec::Gzip(_) => Ok(".gz"), + codec @ (CompressionCodec::Lz4 | CompressionCodec::Zstd(_) | CompressionCodec::Snappy) => Err(Error::new( ErrorKind::FeatureUnsupported, format!("suffix not defined for {codec:?}"), )), @@ -123,7 +178,7 @@ mod tests { async fn test_compression_codec_compress() { let bytes_vec = [0_u8; 100].to_vec(); - let compression_codecs = [CompressionCodec::Zstd, CompressionCodec::Gzip]; + let compression_codecs = [CompressionCodec::Zstd(None), CompressionCodec::Gzip(None)]; for codec in compression_codecs { let compressed = codec.compress(bytes_vec.clone()).unwrap(); @@ -135,7 +190,7 @@ mod tests { #[tokio::test] async fn test_compression_codec_unsupported() { - let unsupported_codecs = [(CompressionCodec::Lz4, "LZ4")]; + let unsupported_codecs = [(CompressionCodec::Lz4, "LZ4"), (CompressionCodec::Snappy, "Snappy")]; let bytes_vec = [0_u8; 100].to_vec(); for (codec, name) in unsupported_codecs { @@ -155,16 +210,28 @@ mod tests { fn test_suffix() { // Test supported codecs assert_eq!(CompressionCodec::None.suffix().unwrap(), ""); - assert_eq!(CompressionCodec::Gzip.suffix().unwrap(), ".gz"); + assert_eq!(CompressionCodec::Gzip(None).suffix().unwrap(), ".gz"); // Test unsupported codecs return errors assert!(CompressionCodec::Lz4.suffix().is_err()); - assert!(CompressionCodec::Zstd.suffix().is_err()); + assert!(CompressionCodec::Zstd(None).suffix().is_err()); + assert!(CompressionCodec::Snappy.suffix().is_err()); let lz4_err = CompressionCodec::Lz4.suffix().unwrap_err(); assert!(lz4_err.to_string().contains("suffix not defined for Lz4")); - let zstd_err = CompressionCodec::Zstd.suffix().unwrap_err(); + let zstd_err = CompressionCodec::Zstd(None).suffix().unwrap_err(); assert!(zstd_err.to_string().contains("suffix not defined for Zstd")); } + + #[test] + fn test_display() { + assert_eq!(CompressionCodec::None.to_string(), "none"); + assert_eq!(CompressionCodec::Lz4.to_string(), "lz4"); + assert_eq!(CompressionCodec::Zstd(None).to_string(), "zstd"); + assert_eq!(CompressionCodec::Zstd(Some(3)).to_string(), "zstd(level=3)"); + assert_eq!(CompressionCodec::Gzip(None).to_string(), "gzip"); + assert_eq!(CompressionCodec::Gzip(Some(6)).to_string(), "gzip(level=6)"); + assert_eq!(CompressionCodec::Snappy.to_string(), "snappy"); + } } diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 1d39cf249b..fff32b065e 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -985,6 +985,6 @@ mod tests { assert!(result.is_ok()); let metadata = result.unwrap(); assert_eq!(metadata.blobs.len(), 1); - assert_eq!(metadata.blobs[0].compression_codec, CompressionCodec::Gzip); + assert_eq!(metadata.blobs[0].compression_codec, CompressionCodec::Gzip(None)); } } diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs index 854d4070ff..6636fbf00a 100644 --- a/crates/iceberg/src/puffin/mod.rs +++ b/crates/iceberg/src/puffin/mod.rs @@ -30,26 +30,28 @@ pub use crate::compression::CompressionCodec; const SUPPORTED_PUFFIN_CODECS: &[CompressionCodec] = &[ CompressionCodec::None, CompressionCodec::Lz4, - CompressionCodec::Zstd, + CompressionCodec::Zstd(None), ]; /// Validates that the compression codec is supported for Puffin files. /// Returns an error if the codec is not supported. fn validate_puffin_compression(codec: CompressionCodec) -> Result<()> { - if !SUPPORTED_PUFFIN_CODECS.contains(&codec) { - let supported_names: Vec = SUPPORTED_PUFFIN_CODECS - .iter() - .map(|c| format!("{c:?}")) - .collect(); - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Compression codec {codec:?} is not supported for Puffin files. Only {} are supported.", - supported_names.join(", ") - ), - )); + match codec { + CompressionCodec::None | CompressionCodec::Lz4 | CompressionCodec::Zstd(_) => Ok(()), + other => { + let supported_names: Vec = SUPPORTED_PUFFIN_CODECS + .iter() + .map(|c| format!("{c}")) + .collect(); + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Compression codec {other} is not supported for Puffin files. Only {} are supported.", + supported_names.join(", ") + ), + )) + } } - Ok(()) } mod metadata; @@ -70,12 +72,13 @@ mod tests { #[test] fn test_puffin_codec_validation() { - // All codecs in SUPPORTED_PUFFIN_CODECS should be valid - for codec in SUPPORTED_PUFFIN_CODECS { - assert!(validate_puffin_compression(*codec).is_ok()); - } + // Supported codecs + assert!(validate_puffin_compression(CompressionCodec::None).is_ok()); + assert!(validate_puffin_compression(CompressionCodec::Lz4).is_ok()); + assert!(validate_puffin_compression(CompressionCodec::Zstd(None)).is_ok()); + assert!(validate_puffin_compression(CompressionCodec::Zstd(Some(3))).is_ok()); - // Gzip should not be supported for Puffin files - assert!(validate_puffin_compression(CompressionCodec::Gzip).is_err()); + // Unsupported codecs + assert!(validate_puffin_compression(CompressionCodec::Gzip(None)).is_err()); } } diff --git a/crates/iceberg/src/puffin/reader.rs b/crates/iceberg/src/puffin/reader.rs index d272f02d41..c3d90e71f6 100644 --- a/crates/iceberg/src/puffin/reader.rs +++ b/crates/iceberg/src/puffin/reader.rs @@ -144,7 +144,7 @@ mod tests { sequence_number: 1, offset: 4, length: 10, - compression_codec: CompressionCodec::Gzip, + compression_codec: CompressionCodec::Gzip(None), properties: HashMap::new(), }; @@ -153,7 +153,7 @@ mod tests { assert!(result.is_err()); let err = result.unwrap_err(); assert_eq!(err.kind(), ErrorKind::DataInvalid); - assert!(err.to_string().contains("Gzip")); + assert!(err.to_string().contains("gzip")); assert!( err.to_string() .contains("is not supported for Puffin files") diff --git a/crates/iceberg/src/puffin/test_utils.rs b/crates/iceberg/src/puffin/test_utils.rs index 39fecc6f80..21e132b2f7 100644 --- a/crates/iceberg/src/puffin/test_utils.rs +++ b/crates/iceberg/src/puffin/test_utils.rs @@ -77,7 +77,7 @@ pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata { sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER, offset: 4, length: 22, - compression_codec: CompressionCodec::Zstd, + compression_codec: CompressionCodec::Zstd(None), properties: HashMap::new(), } } @@ -134,7 +134,7 @@ pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata { sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER, offset: 26, length: 77, - compression_codec: CompressionCodec::Zstd, + compression_codec: CompressionCodec::Zstd(None), properties: HashMap::new(), } } diff --git a/crates/iceberg/src/puffin/writer.rs b/crates/iceberg/src/puffin/writer.rs index 30b97f09dd..0038ea8cfe 100644 --- a/crates/iceberg/src/puffin/writer.rs +++ b/crates/iceberg/src/puffin/writer.rs @@ -251,7 +251,7 @@ mod tests { async fn test_write_zstd_compressed_metric_data() { let temp_dir = TempDir::new().unwrap(); let blobs = vec![blob_0(), blob_1()]; - let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Zstd); + let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Zstd(None)); let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) .await @@ -323,7 +323,7 @@ mod tests { async fn test_zstd_compressed_metric_data_is_bit_identical_to_java_generated_file() { let temp_dir = TempDir::new().unwrap(); let blobs = vec![blob_0(), blob_1()]; - let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Zstd); + let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Zstd(None)); assert_files_are_bit_identical( write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) @@ -338,14 +338,14 @@ mod tests { async fn test_gzip_compression_rejected() { let temp_dir = TempDir::new().unwrap(); let blobs = vec![blob_0()]; - let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Gzip); + let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Gzip(None)); let result = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()).await; assert!(result.is_err()); let err = result.unwrap_err(); assert_eq!(err.kind(), ErrorKind::DataInvalid); - assert!(err.to_string().contains("Gzip")); + assert!(err.to_string().contains("gzip")); assert!( err.to_string() .contains("is not supported for Puffin files") diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index b91599b74f..71e3eaba8d 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -457,7 +457,7 @@ impl TableMetadata { && metadata_content[0] == 0x1F && metadata_content[1] == 0x8B { - let decompressed_data = CompressionCodec::Gzip + let decompressed_data = CompressionCodec::Gzip(None) .decompress(metadata_content.to_vec()) .map_err(|e| { Error::new( @@ -499,7 +499,7 @@ impl TableMetadata { // Apply compression based on codec let data_to_write = match codec { - CompressionCodec::Gzip => codec.compress(json_data)?, + CompressionCodec::Gzip(_) => codec.compress(json_data)?, CompressionCodec::None => json_data, _ => { return Err(Error::new( @@ -3618,7 +3618,7 @@ mod tests { let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json"); let json = serde_json::to_string(&original_metadata).unwrap(); - let compressed = CompressionCodec::Gzip + let compressed = CompressionCodec::Gzip(None) .compress(json.into_bytes()) .expect("failed to compress metadata"); std::fs::write(&metadata_location, &compressed).expect("failed to write metadata"); diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 6e08318479..a934dc60ad 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -85,8 +85,8 @@ pub(crate) fn parse_metadata_file_compression( // Validate that only None and Gzip are used for metadata match codec { - CompressionCodec::None | CompressionCodec::Gzip => Ok(codec), - CompressionCodec::Lz4 | CompressionCodec::Zstd => Err(Error::new( + CompressionCodec::None | CompressionCodec::Gzip(_) => Ok(codec), + CompressionCodec::Lz4 | CompressionCodec::Zstd(_) | CompressionCodec::Snappy => Err(Error::new( ErrorKind::DataInvalid, format!( "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported for metadata files." @@ -305,7 +305,7 @@ mod tests { let table_properties = TableProperties::try_from(&props).unwrap(); assert_eq!( table_properties.metadata_compression_codec, - CompressionCodec::Gzip + CompressionCodec::Gzip(None) ); } @@ -332,7 +332,7 @@ mod tests { let table_properties = TableProperties::try_from(&props_upper).unwrap(); assert_eq!( table_properties.metadata_compression_codec, - CompressionCodec::Gzip + CompressionCodec::Gzip(None) ); // Test mixed case @@ -343,7 +343,7 @@ mod tests { let table_properties = TableProperties::try_from(&props_mixed).unwrap(); assert_eq!( table_properties.metadata_compression_codec, - CompressionCodec::Gzip + CompressionCodec::Gzip(None) ); // Test "NONE" should also be case-insensitive @@ -482,7 +482,7 @@ mod tests { )]); assert_eq!( parse_metadata_file_compression(&props).unwrap(), - CompressionCodec::Gzip + CompressionCodec::Gzip(None) ); // Test case insensitivity - "NONE" @@ -502,7 +502,7 @@ mod tests { )]); assert_eq!( parse_metadata_file_compression(&props).unwrap(), - CompressionCodec::Gzip + CompressionCodec::Gzip(None) ); // Test case insensitivity - "GzIp" @@ -512,7 +512,7 @@ mod tests { )]); assert_eq!( parse_metadata_file_compression(&props).unwrap(), - CompressionCodec::Gzip + CompressionCodec::Gzip(None) ); // Test default when property is missing From bf887e32e9043db1ad13d89eff4701ae1e162338 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 25 Mar 2026 23:12:28 +0000 Subject: [PATCH 2/5] fmt --- crates/iceberg/src/catalog/metadata_location.rs | 5 ++++- crates/iceberg/src/compression.rs | 16 ++++++++++------ crates/iceberg/src/puffin/metadata.rs | 5 ++++- crates/iceberg/src/puffin/writer.rs | 3 ++- crates/iceberg/src/spec/table_properties.rs | 14 ++++++++------ 5 files changed, 28 insertions(+), 15 deletions(-) diff --git a/crates/iceberg/src/catalog/metadata_location.rs b/crates/iceberg/src/catalog/metadata_location.rs index 18b795408b..73333c4f42 100644 --- a/crates/iceberg/src/catalog/metadata_location.rs +++ b/crates/iceberg/src/catalog/metadata_location.rs @@ -345,7 +345,10 @@ mod test { "/test/table/metadata/00005-81056704-ce5b-41c4-bb83-eb6408081af6.gz.metadata.json", ) .unwrap(); - assert_eq!(location_gzip.compression_codec, CompressionCodec::Gzip(None)); + assert_eq!( + location_gzip.compression_codec, + CompressionCodec::Gzip(None) + ); let next_gzip = location_gzip.with_next_version(); assert_eq!(next_gzip.compression_codec, CompressionCodec::Gzip(None)); diff --git a/crates/iceberg/src/compression.rs b/crates/iceberg/src/compression.rs index fb7fffc5b5..e8d225c456 100644 --- a/crates/iceberg/src/compression.rs +++ b/crates/iceberg/src/compression.rs @@ -65,10 +65,9 @@ impl<'de> Deserialize<'de> for CompressionCodec { "zstd" => Ok(CompressionCodec::Zstd(None)), "gzip" => Ok(CompressionCodec::Gzip(None)), "snappy" => Ok(CompressionCodec::Snappy), - other => Err(serde::de::Error::unknown_variant( - other, - &["none", "lz4", "zstd", "gzip", "snappy"], - )), + other => Err(serde::de::Error::unknown_variant(other, &[ + "none", "lz4", "zstd", "gzip", "snappy", + ])), } } } @@ -151,7 +150,9 @@ impl CompressionCodec { match self { CompressionCodec::None => Ok(""), CompressionCodec::Gzip(_) => Ok(".gz"), - codec @ (CompressionCodec::Lz4 | CompressionCodec::Zstd(_) | CompressionCodec::Snappy) => Err(Error::new( + codec @ (CompressionCodec::Lz4 + | CompressionCodec::Zstd(_) + | CompressionCodec::Snappy) => Err(Error::new( ErrorKind::FeatureUnsupported, format!("suffix not defined for {codec:?}"), )), @@ -190,7 +191,10 @@ mod tests { #[tokio::test] async fn test_compression_codec_unsupported() { - let unsupported_codecs = [(CompressionCodec::Lz4, "LZ4"), (CompressionCodec::Snappy, "Snappy")]; + let unsupported_codecs = [ + (CompressionCodec::Lz4, "LZ4"), + (CompressionCodec::Snappy, "Snappy"), + ]; let bytes_vec = [0_u8; 100].to_vec(); for (codec, name) in unsupported_codecs { diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index fff32b065e..76e39b8f01 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -985,6 +985,9 @@ mod tests { assert!(result.is_ok()); let metadata = result.unwrap(); assert_eq!(metadata.blobs.len(), 1); - assert_eq!(metadata.blobs[0].compression_codec, CompressionCodec::Gzip(None)); + assert_eq!( + metadata.blobs[0].compression_codec, + CompressionCodec::Gzip(None) + ); } } diff --git a/crates/iceberg/src/puffin/writer.rs b/crates/iceberg/src/puffin/writer.rs index 0038ea8cfe..718ee2740c 100644 --- a/crates/iceberg/src/puffin/writer.rs +++ b/crates/iceberg/src/puffin/writer.rs @@ -251,7 +251,8 @@ mod tests { async fn test_write_zstd_compressed_metric_data() { let temp_dir = TempDir::new().unwrap(); let blobs = vec![blob_0(), blob_1()]; - let blobs_with_compression = blobs_with_compression(blobs.clone(), CompressionCodec::Zstd(None)); + let blobs_with_compression = + blobs_with_compression(blobs.clone(), CompressionCodec::Zstd(None)); let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) .await diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index a934dc60ad..323f204c41 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -86,12 +86,14 @@ pub(crate) fn parse_metadata_file_compression( // Validate that only None and Gzip are used for metadata match codec { CompressionCodec::None | CompressionCodec::Gzip(_) => Ok(codec), - CompressionCodec::Lz4 | CompressionCodec::Zstd(_) | CompressionCodec::Snappy => Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported for metadata files." - ), - )), + CompressionCodec::Lz4 | CompressionCodec::Zstd(_) | CompressionCodec::Snappy => { + Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported for metadata files." + ), + )) + } } } From d03b6463ee9cf13b2f0224870e39d838de08b694 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 25 Mar 2026 23:28:21 +0000 Subject: [PATCH 3/5] display improvement --- crates/iceberg/src/compression.rs | 28 ++++++++++++++-------------- crates/iceberg/src/puffin/reader.rs | 2 +- crates/iceberg/src/puffin/writer.rs | 2 +- 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/crates/iceberg/src/compression.rs b/crates/iceberg/src/compression.rs index e8d225c456..fb82bc09b8 100644 --- a/crates/iceberg/src/compression.rs +++ b/crates/iceberg/src/compression.rs @@ -75,13 +75,13 @@ impl<'de> Deserialize<'de> for CompressionCodec { impl fmt::Display for CompressionCodec { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - CompressionCodec::None => write!(f, "none"), - CompressionCodec::Lz4 => write!(f, "lz4"), - CompressionCodec::Zstd(Some(level)) => write!(f, "zstd(level={level})"), - CompressionCodec::Zstd(None) => write!(f, "zstd"), - CompressionCodec::Gzip(Some(level)) => write!(f, "gzip(level={level})"), - CompressionCodec::Gzip(None) => write!(f, "gzip"), - CompressionCodec::Snappy => write!(f, "snappy"), + CompressionCodec::None => write!(f, "None"), + CompressionCodec::Lz4 => write!(f, "Lz4"), + CompressionCodec::Zstd(None) => write!(f, "Zstd"), + CompressionCodec::Zstd(Some(level)) => write!(f, "Zstd(level={level})"), + CompressionCodec::Gzip(None) => write!(f, "Gzip"), + CompressionCodec::Gzip(Some(level)) => write!(f, "Gzip(level={level})"), + CompressionCodec::Snappy => write!(f, "Snappy"), } } } @@ -230,12 +230,12 @@ mod tests { #[test] fn test_display() { - assert_eq!(CompressionCodec::None.to_string(), "none"); - assert_eq!(CompressionCodec::Lz4.to_string(), "lz4"); - assert_eq!(CompressionCodec::Zstd(None).to_string(), "zstd"); - assert_eq!(CompressionCodec::Zstd(Some(3)).to_string(), "zstd(level=3)"); - assert_eq!(CompressionCodec::Gzip(None).to_string(), "gzip"); - assert_eq!(CompressionCodec::Gzip(Some(6)).to_string(), "gzip(level=6)"); - assert_eq!(CompressionCodec::Snappy.to_string(), "snappy"); + assert_eq!(CompressionCodec::None.to_string(), "None"); + assert_eq!(CompressionCodec::Lz4.to_string(), "Lz4"); + assert_eq!(CompressionCodec::Zstd(None).to_string(), "Zstd"); + assert_eq!(CompressionCodec::Zstd(Some(3)).to_string(), "Zstd(level=3)"); + assert_eq!(CompressionCodec::Gzip(None).to_string(), "Gzip"); + assert_eq!(CompressionCodec::Gzip(Some(6)).to_string(), "Gzip(level=6)"); + assert_eq!(CompressionCodec::Snappy.to_string(), "Snappy"); } } diff --git a/crates/iceberg/src/puffin/reader.rs b/crates/iceberg/src/puffin/reader.rs index c3d90e71f6..501d0241fa 100644 --- a/crates/iceberg/src/puffin/reader.rs +++ b/crates/iceberg/src/puffin/reader.rs @@ -153,7 +153,7 @@ mod tests { assert!(result.is_err()); let err = result.unwrap_err(); assert_eq!(err.kind(), ErrorKind::DataInvalid); - assert!(err.to_string().contains("gzip")); + assert!(err.to_string().contains("Gzip")); assert!( err.to_string() .contains("is not supported for Puffin files") diff --git a/crates/iceberg/src/puffin/writer.rs b/crates/iceberg/src/puffin/writer.rs index 718ee2740c..6355337f33 100644 --- a/crates/iceberg/src/puffin/writer.rs +++ b/crates/iceberg/src/puffin/writer.rs @@ -346,7 +346,7 @@ mod tests { assert!(result.is_err()); let err = result.unwrap_err(); assert_eq!(err.kind(), ErrorKind::DataInvalid); - assert!(err.to_string().contains("gzip")); + assert!(err.to_string().contains("Gzip")); assert!( err.to_string() .contains("is not supported for Puffin files") From d19008d2b360723bd93816341b9f9e9a994395e5 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Fri, 27 Mar 2026 22:25:32 +0000 Subject: [PATCH 4/5] address feedback --- crates/iceberg/src/compression.rs | 8 +++++--- crates/iceberg/src/spec/table_properties.rs | 14 ++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/iceberg/src/compression.rs b/crates/iceberg/src/compression.rs index fb82bc09b8..6d39ce11d1 100644 --- a/crates/iceberg/src/compression.rs +++ b/crates/iceberg/src/compression.rs @@ -35,9 +35,10 @@ pub enum CompressionCodec { None, /// LZ4 single compression frame with content size present Lz4, - /// Zstandard single compression frame with content size present. Optional level 0–22. + /// Zstandard single compression frame with content size present. Optional level 0–22, + /// where 0 means default compression level (not no compression, unlike Gzip). Zstd(Option), - /// Gzip compression. Optional level 0–9. + /// Gzip compression. Optional level 0–9, where 0 means no compression. Gzip(Option), /// Snappy compression Snappy, @@ -124,7 +125,8 @@ impl CompressionCodec { Ok(encoder.finish()?) } CompressionCodec::Gzip(level) => { - let compression = Compression::new(level.unwrap_or(6).min(9) as u32); + let compression = + level.map_or_else(Compression::default, |l| Compression::new(l.min(9) as u32)); let mut encoder = GzEncoder::new(Vec::new(), compression); encoder.write_all(&bytes)?; Ok(encoder.finish()?) diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index 323f204c41..ca25cfb1ca 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -86,14 +86,12 @@ pub(crate) fn parse_metadata_file_compression( // Validate that only None and Gzip are used for metadata match codec { CompressionCodec::None | CompressionCodec::Gzip(_) => Ok(codec), - CompressionCodec::Lz4 | CompressionCodec::Zstd(_) | CompressionCodec::Snappy => { - Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported for metadata files." - ), - )) - } + _ => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported for metadata files." + ), + )), } } From 76a4a5bc4c210f390a568bc421989045cd36eeeb Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Mon, 30 Mar 2026 22:25:03 +0000 Subject: [PATCH 5/5] address comments --- .../iceberg/src/catalog/metadata_location.rs | 18 ++-- crates/iceberg/src/compression.rs | 91 +++++++++++++------ crates/iceberg/src/puffin/metadata.rs | 2 +- crates/iceberg/src/puffin/mod.rs | 36 +++----- crates/iceberg/src/puffin/reader.rs | 4 +- crates/iceberg/src/puffin/test_utils.rs | 4 +- crates/iceberg/src/puffin/writer.rs | 10 +- crates/iceberg/src/spec/table_metadata.rs | 4 +- crates/iceberg/src/spec/table_properties.rs | 20 ++-- 9 files changed, 112 insertions(+), 77 deletions(-) diff --git a/crates/iceberg/src/catalog/metadata_location.rs b/crates/iceberg/src/catalog/metadata_location.rs index 73333c4f42..acd041d5e1 100644 --- a/crates/iceberg/src/catalog/metadata_location.rs +++ b/crates/iceberg/src/catalog/metadata_location.rs @@ -114,9 +114,9 @@ impl MetadataLocation { ))?; // Check for compression suffix (e.g., .gz) - let gzip_suffix = CompressionCodec::Gzip(None).suffix()?; + let gzip_suffix = CompressionCodec::gzip_default().suffix()?; let (stripped, compression_codec) = if let Some(s) = stripped.strip_suffix(gzip_suffix) { - (s, CompressionCodec::Gzip(None)) + (s, CompressionCodec::gzip_default()) } else { (stripped, CompressionCodec::None) }; @@ -261,7 +261,7 @@ mod test { table_location: "/abc".to_string(), version: 1234567, id: Uuid::from_str("2cd22b57-5127-4198-92ba-e4e67c79821b").unwrap(), - compression_codec: CompressionCodec::Gzip(None), + compression_codec: CompressionCodec::gzip_default(), }), ), // Negative version @@ -347,11 +347,14 @@ mod test { .unwrap(); assert_eq!( location_gzip.compression_codec, - CompressionCodec::Gzip(None) + CompressionCodec::gzip_default() ); let next_gzip = location_gzip.with_next_version(); - assert_eq!(next_gzip.compression_codec, CompressionCodec::Gzip(None)); + assert_eq!( + next_gzip.compression_codec, + CompressionCodec::gzip_default() + ); assert_eq!(next_gzip.version, 6); } @@ -372,7 +375,10 @@ mod test { ); let metadata_gzip = create_test_metadata(props_gzip); let updated_gzip = location.with_new_metadata(&metadata_gzip); - assert_eq!(updated_gzip.compression_codec, CompressionCodec::Gzip(None)); + assert_eq!( + updated_gzip.compression_codec, + CompressionCodec::gzip_default() + ); assert_eq!(updated_gzip.version, 0); assert_eq!( updated_gzip.to_string(), diff --git a/crates/iceberg/src/compression.rs b/crates/iceberg/src/compression.rs index 6d39ce11d1..929d9226e7 100644 --- a/crates/iceberg/src/compression.rs +++ b/crates/iceberg/src/compression.rs @@ -27,6 +27,13 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::{Error, ErrorKind, Result}; +/// Default compression level for Zstandard (zstd). +const ZSTD_DEFAULT_LEVEL: u8 = 3; +/// Default compression level for Gzip. +const GZIP_DEFAULT_LEVEL: u8 = 6; +/// Maximum compression level for Gzip. +const GZIP_MAX_LEVEL: u8 = 9; + /// Data compression formats #[derive(Debug, PartialEq, Eq, Clone, Copy, Default)] pub enum CompressionCodec { @@ -35,25 +42,47 @@ pub enum CompressionCodec { None, /// LZ4 single compression frame with content size present Lz4, - /// Zstandard single compression frame with content size present. Optional level 0–22, - /// where 0 means default compression level (not no compression, unlike Gzip). - Zstd(Option), - /// Gzip compression. Optional level 0–9, where 0 means no compression. - Gzip(Option), + /// Zstandard single compression frame with content size present. + /// Level range is 0–22, where 0 means default compression level (not no compression). + /// Use [`CompressionCodec::zstd_default`] to construct with the default level. + Zstd(u8), + /// Gzip compression. Level range is 0–9, where 0 means no compression. + /// Use [`CompressionCodec::gzip_default`] to construct with the default level. + Gzip(u8), /// Snappy compression Snappy, } -impl Serialize for CompressionCodec { - fn serialize(&self, serializer: S) -> std::result::Result { - let s = match self { +impl CompressionCodec { + /// Returns a Zstd codec with the default compression level. + pub const fn zstd_default() -> Self { + CompressionCodec::Zstd(ZSTD_DEFAULT_LEVEL) + } + + /// Returns a Gzip codec with the default compression level. + pub const fn gzip_default() -> Self { + CompressionCodec::Gzip(GZIP_DEFAULT_LEVEL) + } + + /// Returns the codec name as used in serialization and error messages. + pub fn name(&self) -> &'static str { + match self { CompressionCodec::None => "none", CompressionCodec::Lz4 => "lz4", CompressionCodec::Zstd(_) => "zstd", CompressionCodec::Gzip(_) => "gzip", CompressionCodec::Snappy => "snappy", - }; - serializer.serialize_str(s) + } + } +} + +// Note: serialize/deserialize do not round-trip the compression level. Iceberg configuration +// only the codec name (e.g. "zstd"), not the level, so deserialization always produces the +// default level. A `Zstd(5)` written to metadata will be read back as `Zstd(3)`. Some +// compression configuration (e.g. Avro metadata) has a separate level field alongside the codec name. +impl Serialize for CompressionCodec { + fn serialize(&self, serializer: S) -> std::result::Result { + serializer.serialize_str(self.name()) } } @@ -63,8 +92,8 @@ impl<'de> Deserialize<'de> for CompressionCodec { match s.to_lowercase().as_str() { "none" => Ok(CompressionCodec::None), "lz4" => Ok(CompressionCodec::Lz4), - "zstd" => Ok(CompressionCodec::Zstd(None)), - "gzip" => Ok(CompressionCodec::Gzip(None)), + "zstd" => Ok(CompressionCodec::zstd_default()), + "gzip" => Ok(CompressionCodec::gzip_default()), "snappy" => Ok(CompressionCodec::Snappy), other => Err(serde::de::Error::unknown_variant(other, &[ "none", "lz4", "zstd", "gzip", "snappy", @@ -78,10 +107,8 @@ impl fmt::Display for CompressionCodec { match self { CompressionCodec::None => write!(f, "None"), CompressionCodec::Lz4 => write!(f, "Lz4"), - CompressionCodec::Zstd(None) => write!(f, "Zstd"), - CompressionCodec::Zstd(Some(level)) => write!(f, "Zstd(level={level})"), - CompressionCodec::Gzip(None) => write!(f, "Gzip"), - CompressionCodec::Gzip(Some(level)) => write!(f, "Gzip(level={level})"), + CompressionCodec::Zstd(level) => write!(f, "Zstd(level={level})"), + CompressionCodec::Gzip(level) => write!(f, "Gzip(level={level})"), CompressionCodec::Snappy => write!(f, "Snappy"), } } @@ -118,15 +145,14 @@ impl CompressionCodec { )), CompressionCodec::Zstd(level) => { let writer = Vec::::new(); - let mut encoder = zstd::stream::Encoder::new(writer, level.unwrap_or(3) as i32)?; + let mut encoder = zstd::stream::Encoder::new(writer, *level as i32)?; encoder.include_checksum(true)?; encoder.set_pledged_src_size(Some(bytes.len().try_into()?))?; std::io::copy(&mut &bytes[..], &mut encoder)?; Ok(encoder.finish()?) } CompressionCodec::Gzip(level) => { - let compression = - level.map_or_else(Compression::default, |l| Compression::new(l.min(9) as u32)); + let compression = Compression::new((*level).min(GZIP_MAX_LEVEL) as u32); let mut encoder = GzEncoder::new(Vec::new(), compression); encoder.write_all(&bytes)?; Ok(encoder.finish()?) @@ -181,7 +207,10 @@ mod tests { async fn test_compression_codec_compress() { let bytes_vec = [0_u8; 100].to_vec(); - let compression_codecs = [CompressionCodec::Zstd(None), CompressionCodec::Gzip(None)]; + let compression_codecs = [ + CompressionCodec::zstd_default(), + CompressionCodec::gzip_default(), + ]; for codec in compression_codecs { let compressed = codec.compress(bytes_vec.clone()).unwrap(); @@ -214,19 +243,17 @@ mod tests { #[test] fn test_suffix() { - // Test supported codecs assert_eq!(CompressionCodec::None.suffix().unwrap(), ""); - assert_eq!(CompressionCodec::Gzip(None).suffix().unwrap(), ".gz"); + assert_eq!(CompressionCodec::gzip_default().suffix().unwrap(), ".gz"); - // Test unsupported codecs return errors assert!(CompressionCodec::Lz4.suffix().is_err()); - assert!(CompressionCodec::Zstd(None).suffix().is_err()); + assert!(CompressionCodec::zstd_default().suffix().is_err()); assert!(CompressionCodec::Snappy.suffix().is_err()); let lz4_err = CompressionCodec::Lz4.suffix().unwrap_err(); assert!(lz4_err.to_string().contains("suffix not defined for Lz4")); - let zstd_err = CompressionCodec::Zstd(None).suffix().unwrap_err(); + let zstd_err = CompressionCodec::zstd_default().suffix().unwrap_err(); assert!(zstd_err.to_string().contains("suffix not defined for Zstd")); } @@ -234,10 +261,16 @@ mod tests { fn test_display() { assert_eq!(CompressionCodec::None.to_string(), "None"); assert_eq!(CompressionCodec::Lz4.to_string(), "Lz4"); - assert_eq!(CompressionCodec::Zstd(None).to_string(), "Zstd"); - assert_eq!(CompressionCodec::Zstd(Some(3)).to_string(), "Zstd(level=3)"); - assert_eq!(CompressionCodec::Gzip(None).to_string(), "Gzip"); - assert_eq!(CompressionCodec::Gzip(Some(6)).to_string(), "Gzip(level=6)"); + assert_eq!( + CompressionCodec::zstd_default().to_string(), + "Zstd(level=3)" + ); + assert_eq!(CompressionCodec::Zstd(5).to_string(), "Zstd(level=5)"); + assert_eq!( + CompressionCodec::gzip_default().to_string(), + "Gzip(level=6)" + ); + assert_eq!(CompressionCodec::Gzip(9).to_string(), "Gzip(level=9)"); assert_eq!(CompressionCodec::Snappy.to_string(), "Snappy"); } } diff --git a/crates/iceberg/src/puffin/metadata.rs b/crates/iceberg/src/puffin/metadata.rs index 76e39b8f01..e2dfc10c23 100644 --- a/crates/iceberg/src/puffin/metadata.rs +++ b/crates/iceberg/src/puffin/metadata.rs @@ -987,7 +987,7 @@ mod tests { assert_eq!(metadata.blobs.len(), 1); assert_eq!( metadata.blobs[0].compression_codec, - CompressionCodec::Gzip(None) + CompressionCodec::gzip_default() ); } } diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs index 6636fbf00a..0e054cac51 100644 --- a/crates/iceberg/src/puffin/mod.rs +++ b/crates/iceberg/src/puffin/mod.rs @@ -26,31 +26,21 @@ pub use blob::{APACHE_DATASKETCHES_THETA_V1, Blob, DELETION_VECTOR_V1}; pub use crate::compression::CompressionCodec; -/// Compression codecs supported by the Puffin spec. -const SUPPORTED_PUFFIN_CODECS: &[CompressionCodec] = &[ - CompressionCodec::None, - CompressionCodec::Lz4, - CompressionCodec::Zstd(None), -]; - /// Validates that the compression codec is supported for Puffin files. /// Returns an error if the codec is not supported. fn validate_puffin_compression(codec: CompressionCodec) -> Result<()> { match codec { CompressionCodec::None | CompressionCodec::Lz4 | CompressionCodec::Zstd(_) => Ok(()), - other => { - let supported_names: Vec = SUPPORTED_PUFFIN_CODECS - .iter() - .map(|c| format!("{c}")) - .collect(); - Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Compression codec {other} is not supported for Puffin files. Only {} are supported.", - supported_names.join(", ") - ), - )) - } + other => Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Compression codec {} is not supported for Puffin files. Only {}, {}, and {} are supported.", + other.name(), + CompressionCodec::None.name(), + CompressionCodec::Lz4.name(), + CompressionCodec::zstd_default().name() + ), + )), } } @@ -75,10 +65,10 @@ mod tests { // Supported codecs assert!(validate_puffin_compression(CompressionCodec::None).is_ok()); assert!(validate_puffin_compression(CompressionCodec::Lz4).is_ok()); - assert!(validate_puffin_compression(CompressionCodec::Zstd(None)).is_ok()); - assert!(validate_puffin_compression(CompressionCodec::Zstd(Some(3))).is_ok()); + assert!(validate_puffin_compression(CompressionCodec::zstd_default()).is_ok()); + assert!(validate_puffin_compression(CompressionCodec::Zstd(5)).is_ok()); // Unsupported codecs - assert!(validate_puffin_compression(CompressionCodec::Gzip(None)).is_err()); + assert!(validate_puffin_compression(CompressionCodec::gzip_default()).is_err()); } } diff --git a/crates/iceberg/src/puffin/reader.rs b/crates/iceberg/src/puffin/reader.rs index 501d0241fa..0aced4186f 100644 --- a/crates/iceberg/src/puffin/reader.rs +++ b/crates/iceberg/src/puffin/reader.rs @@ -144,7 +144,7 @@ mod tests { sequence_number: 1, offset: 4, length: 10, - compression_codec: CompressionCodec::Gzip(None), + compression_codec: CompressionCodec::gzip_default(), properties: HashMap::new(), }; @@ -153,7 +153,7 @@ mod tests { assert!(result.is_err()); let err = result.unwrap_err(); assert_eq!(err.kind(), ErrorKind::DataInvalid); - assert!(err.to_string().contains("Gzip")); + assert!(err.to_string().contains("gzip")); assert!( err.to_string() .contains("is not supported for Puffin files") diff --git a/crates/iceberg/src/puffin/test_utils.rs b/crates/iceberg/src/puffin/test_utils.rs index 21e132b2f7..e0844e2002 100644 --- a/crates/iceberg/src/puffin/test_utils.rs +++ b/crates/iceberg/src/puffin/test_utils.rs @@ -77,7 +77,7 @@ pub(crate) fn zstd_compressed_metric_blob_0_metadata() -> BlobMetadata { sequence_number: METRIC_BLOB_0_SEQUENCE_NUMBER, offset: 4, length: 22, - compression_codec: CompressionCodec::Zstd(None), + compression_codec: CompressionCodec::zstd_default(), properties: HashMap::new(), } } @@ -134,7 +134,7 @@ pub(crate) fn zstd_compressed_metric_blob_1_metadata() -> BlobMetadata { sequence_number: METRIC_BLOB_1_SEQUENCE_NUMBER, offset: 26, length: 77, - compression_codec: CompressionCodec::Zstd(None), + compression_codec: CompressionCodec::zstd_default(), properties: HashMap::new(), } } diff --git a/crates/iceberg/src/puffin/writer.rs b/crates/iceberg/src/puffin/writer.rs index 6355337f33..4af4970b04 100644 --- a/crates/iceberg/src/puffin/writer.rs +++ b/crates/iceberg/src/puffin/writer.rs @@ -252,7 +252,7 @@ mod tests { let temp_dir = TempDir::new().unwrap(); let blobs = vec![blob_0(), blob_1()]; let blobs_with_compression = - blobs_with_compression(blobs.clone(), CompressionCodec::Zstd(None)); + blobs_with_compression(blobs.clone(), CompressionCodec::zstd_default()); let input_file = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) .await @@ -324,7 +324,8 @@ mod tests { async fn test_zstd_compressed_metric_data_is_bit_identical_to_java_generated_file() { let temp_dir = TempDir::new().unwrap(); let blobs = vec![blob_0(), blob_1()]; - let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Zstd(None)); + let blobs_with_compression = + blobs_with_compression(blobs, CompressionCodec::zstd_default()); assert_files_are_bit_identical( write_puffin_file(&temp_dir, blobs_with_compression, file_properties()) @@ -339,14 +340,15 @@ mod tests { async fn test_gzip_compression_rejected() { let temp_dir = TempDir::new().unwrap(); let blobs = vec![blob_0()]; - let blobs_with_compression = blobs_with_compression(blobs, CompressionCodec::Gzip(None)); + let blobs_with_compression = + blobs_with_compression(blobs, CompressionCodec::gzip_default()); let result = write_puffin_file(&temp_dir, blobs_with_compression, file_properties()).await; assert!(result.is_err()); let err = result.unwrap_err(); assert_eq!(err.kind(), ErrorKind::DataInvalid); - assert!(err.to_string().contains("Gzip")); + assert!(err.to_string().contains("gzip")); assert!( err.to_string() .contains("is not supported for Puffin files") diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 71e3eaba8d..524139d434 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -457,7 +457,7 @@ impl TableMetadata { && metadata_content[0] == 0x1F && metadata_content[1] == 0x8B { - let decompressed_data = CompressionCodec::Gzip(None) + let decompressed_data = CompressionCodec::gzip_default() .decompress(metadata_content.to_vec()) .map_err(|e| { Error::new( @@ -3618,7 +3618,7 @@ mod tests { let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json"); let json = serde_json::to_string(&original_metadata).unwrap(); - let compressed = CompressionCodec::Gzip(None) + let compressed = CompressionCodec::gzip_default() .compress(json.into_bytes()) .expect("failed to compress metadata"); std::fs::write(&metadata_location, &compressed).expect("failed to write metadata"); diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index ca25cfb1ca..d57b9a1583 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -78,7 +78,9 @@ pub(crate) fn parse_metadata_file_compression( Error::new( ErrorKind::DataInvalid, format!( - "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported." + "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported.", + CompressionCodec::None.name(), + CompressionCodec::gzip_default().name() ), ) })?; @@ -89,7 +91,9 @@ pub(crate) fn parse_metadata_file_compression( _ => Err(Error::new( ErrorKind::DataInvalid, format!( - "Invalid metadata compression codec: {value}. Only 'none' and 'gzip' are supported for metadata files." + "Invalid metadata compression codec: {value}. Only '{}' and '{}' are supported for metadata files.", + CompressionCodec::None.name(), + CompressionCodec::gzip_default().name() ), )), } @@ -305,7 +309,7 @@ mod tests { let table_properties = TableProperties::try_from(&props).unwrap(); assert_eq!( table_properties.metadata_compression_codec, - CompressionCodec::Gzip(None) + CompressionCodec::gzip_default() ); } @@ -332,7 +336,7 @@ mod tests { let table_properties = TableProperties::try_from(&props_upper).unwrap(); assert_eq!( table_properties.metadata_compression_codec, - CompressionCodec::Gzip(None) + CompressionCodec::gzip_default() ); // Test mixed case @@ -343,7 +347,7 @@ mod tests { let table_properties = TableProperties::try_from(&props_mixed).unwrap(); assert_eq!( table_properties.metadata_compression_codec, - CompressionCodec::Gzip(None) + CompressionCodec::gzip_default() ); // Test "NONE" should also be case-insensitive @@ -482,7 +486,7 @@ mod tests { )]); assert_eq!( parse_metadata_file_compression(&props).unwrap(), - CompressionCodec::Gzip(None) + CompressionCodec::gzip_default() ); // Test case insensitivity - "NONE" @@ -502,7 +506,7 @@ mod tests { )]); assert_eq!( parse_metadata_file_compression(&props).unwrap(), - CompressionCodec::Gzip(None) + CompressionCodec::gzip_default() ); // Test case insensitivity - "GzIp" @@ -512,7 +516,7 @@ mod tests { )]); assert_eq!( parse_metadata_file_compression(&props).unwrap(), - CompressionCodec::Gzip(None) + CompressionCodec::gzip_default() ); // Test default when property is missing