diff --git a/Cargo.lock b/Cargo.lock index 5110d5a480..95de2c4a41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3348,7 +3348,9 @@ dependencies = [ "futures", "iceberg_test_utils", "itertools 0.13.0", + "log", "minijinja", + "miniz_oxide", "mockall", "moka", "murmur3", diff --git a/Cargo.toml b/Cargo.toml index 778e69c9d9..a4fd77a789 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,7 +43,7 @@ rust-version = "1.92" aes = { version = "0.8", features = ["zeroize"] } aes-gcm = "0.10" anyhow = "1.0.72" -apache-avro = { version = "0.21", features = ["zstandard"] } +apache-avro = { version = "0.21", features = ["zstandard", "snappy"] } array-init = "2" arrow-arith = "58" arrow-array = "58" @@ -98,6 +98,7 @@ log = "0.4.28" metainfo = "0.7.14" mimalloc = "0.1.46" minijinja = "2.12.0" +miniz_oxide = "0.8" mockall = "0.13.1" mockito = "1" motore-macros = "0.4.3" diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index aa1d0cd4a5..76b871f616 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -58,6 +58,8 @@ flate2 = { workspace = true } fnv = { workspace = true } futures = { workspace = true } itertools = { workspace = true } +log = { workspace = true } +miniz_oxide = { workspace = true } moka = { version = "0.12.10", features = ["future"] } murmur3 = { workspace = true } once_cell = { workspace = true } @@ -89,6 +91,7 @@ mockall = { workspace = true } pretty_assertions = { workspace = true } rand = { workspace = true } regex = { workspace = true } +rstest = { workspace = true } tempfile = { workspace = true } minijinja = { workspace = true } diff --git a/crates/iceberg/src/compression.rs b/crates/iceberg/src/compression.rs index 929d9226e7..1cd2578612 100644 --- a/crates/iceberg/src/compression.rs +++ b/crates/iceberg/src/compression.rs @@ -173,7 +173,7 @@ impl CompressionCodec { /// /// # Errors /// - /// Returns an error for Lz4 and Zstd as they are not fully supported. + /// Returns an error for Lz4, Zstd, and Snappy as they are not fully supported. pub fn suffix(&self) -> Result<&'static str> { match self { CompressionCodec::None => Ok(""), diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 8881471ae8..8ccc0f19dd 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -192,6 +192,7 @@ mod tests { use super::*; use crate::TableIdent; + use crate::compression::CompressionCodec; use crate::io::{FileIO, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestEntry, @@ -272,6 +273,7 @@ mod tests { None, current_schema.clone(), current_partition_spec.as_ref().clone(), + CompressionCodec::None, ) .build_v2_data(); writer @@ -304,6 +306,7 @@ mod tests { current_snapshot.snapshot_id(), current_snapshot.parent_snapshot_id(), current_snapshot.sequence_number(), + CompressionCodec::None, ); manifest_list_write .add_manifests(vec![data_file_manifest].into_iter()) diff --git a/crates/iceberg/src/puffin/mod.rs b/crates/iceberg/src/puffin/mod.rs index 0e054cac51..84d2de0b72 100644 --- a/crates/iceberg/src/puffin/mod.rs +++ b/crates/iceberg/src/puffin/mod.rs @@ -66,9 +66,10 @@ mod tests { assert!(validate_puffin_compression(CompressionCodec::None).is_ok()); assert!(validate_puffin_compression(CompressionCodec::Lz4).is_ok()); assert!(validate_puffin_compression(CompressionCodec::zstd_default()).is_ok()); - assert!(validate_puffin_compression(CompressionCodec::Zstd(5)).is_ok()); + assert!(validate_puffin_compression(CompressionCodec::Zstd(3)).is_ok()); // Unsupported codecs assert!(validate_puffin_compression(CompressionCodec::gzip_default()).is_err()); + assert!(validate_puffin_compression(CompressionCodec::Snappy).is_err()); } } diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index e52b3bdeae..0e6636eaeb 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -565,7 +565,7 @@ pub mod tests { //! shared tests for the table scan API #![allow(missing_docs)] - use std::collections::HashMap; + use std::collections::{HashMap, HashSet}; use std::fs; use std::fs::File; use std::sync::Arc; @@ -586,6 +586,7 @@ pub mod tests { use crate::TableIdent; use crate::arrow::ArrowReaderBuilder; + use crate::compression::CompressionCodec; use crate::expr::{BoundPredicate, Reference}; use crate::io::{FileIO, OutputFile}; use crate::metadata_columns::RESERVED_COL_NAME_FILE; @@ -756,6 +757,7 @@ pub mod tests { None, current_schema.clone(), current_partition_spec.as_ref().clone(), + CompressionCodec::None, ) .build_v2_data(); writer @@ -833,6 +835,7 @@ pub mod tests { current_snapshot.snapshot_id(), current_snapshot.parent_snapshot_id(), current_snapshot.sequence_number(), + CompressionCodec::None, ); manifest_list_write .add_manifests(vec![data_file_manifest].into_iter()) @@ -980,6 +983,7 @@ pub mod tests { None, current_schema.clone(), current_partition_spec.as_ref().clone(), + CompressionCodec::None, ) .build_v2_data(); @@ -1064,6 +1068,7 @@ pub mod tests { current_snapshot.snapshot_id(), current_snapshot.parent_snapshot_id(), current_snapshot.sequence_number(), + CompressionCodec::None, ); manifest_list_write .add_manifests(vec![data_file_manifest].into_iter()) @@ -1086,6 +1091,7 @@ pub mod tests { None, current_schema.clone(), current_partition_spec.as_ref().clone(), + CompressionCodec::None, ) .build_v2_data(); @@ -1121,6 +1127,7 @@ pub mod tests { None, current_schema.clone(), current_partition_spec.as_ref().clone(), + CompressionCodec::None, ) .build_v2_deletes(); @@ -1155,6 +1162,7 @@ pub mod tests { current_snapshot.snapshot_id(), current_snapshot.parent_snapshot_id(), current_snapshot.sequence_number(), + CompressionCodec::None, ); manifest_list_write .add_manifests(vec![data_manifest, delete_manifest].into_iter()) @@ -1812,8 +1820,6 @@ pub mod tests { #[tokio::test] async fn test_select_with_file_column() { - use arrow_array::cast::AsArray; - let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; @@ -1935,8 +1941,6 @@ pub mod tests { #[tokio::test] async fn test_file_column_with_multiple_files() { - use std::collections::HashSet; - let mut fixture = TableTestFixture::new(); fixture.setup_manifest_files().await; diff --git a/crates/iceberg/src/spec/avro_util.rs b/crates/iceberg/src/spec/avro_util.rs new file mode 100644 index 0000000000..6c05b9b40b --- /dev/null +++ b/crates/iceberg/src/spec/avro_util.rs @@ -0,0 +1,153 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for working with Apache Avro in Iceberg. + +use apache_avro::{Codec, DeflateSettings, ZstandardSettings}; +use miniz_oxide::deflate::CompressionLevel; + +use crate::compression::CompressionCodec; +use crate::{Error, ErrorKind, Result}; + +/// Codec name for uncompressed (Avro-specific; maps to [`CompressionCodec::None`]) +const CODEC_UNCOMPRESSED: &str = "uncompressed"; + +/// Default compression level for gzip in Avro (matches Java implementation) +const DEFAULT_GZIP_LEVEL: u8 = 9; +/// Default compression level for zstd in Avro (matches Java implementation) +const DEFAULT_ZSTD_LEVEL: u8 = 1; +/// Max supported level for ZSTD +const MAX_ZSTD_LEVEL: u8 = 22; + +/// Parse a codec name and optional level into a [`CompressionCodec`]. +/// +/// The codec name is parsed via [`CompressionCodec`]'s standard deserialization. +/// `"uncompressed"` (Avro-specific) is mapped to [`CompressionCodec::None`]. +/// Avro-specific defaults apply when `level` is `None`: gzip→9, zstd→1. +pub(crate) fn parse_avro_codec(codec: Option<&str>, level: Option) -> Result { + let Some(codec_str) = codec else { + return Ok(CompressionCodec::None); + }; + let normalized = if codec_str.eq_ignore_ascii_case(CODEC_UNCOMPRESSED) { + "none" + } else { + codec_str + }; + let parsed: CompressionCodec = serde_json::from_value(serde_json::Value::String( + normalized.to_string(), + )) + .map_err(|_| { + Error::new( + ErrorKind::DataInvalid, + format!("Unrecognized Avro compression codec: {codec_str}"), + ) + })?; + match parsed { + CompressionCodec::None => Ok(CompressionCodec::None), + CompressionCodec::Snappy => Ok(CompressionCodec::Snappy), + CompressionCodec::Gzip(_) => { + Ok(CompressionCodec::Gzip(level.unwrap_or(DEFAULT_GZIP_LEVEL))) + } + CompressionCodec::Zstd(_) => { + Ok(CompressionCodec::Zstd(level.unwrap_or(DEFAULT_ZSTD_LEVEL))) + } + other => Err(Error::new( + ErrorKind::DataInvalid, + format!("Unsupported Avro compression codec: {}", other.name()), + )), + } +} + +/// Convert a [`CompressionCodec`] to an [`apache_avro::Codec`] for use in Avro writers. +pub(crate) fn to_avro_codec(codec: CompressionCodec) -> Codec { + match codec { + CompressionCodec::None => Codec::Null, + CompressionCodec::Snappy => Codec::Snappy, + CompressionCodec::Lz4 => Codec::Null, + CompressionCodec::Gzip(level) => { + let compression_level = match level { + 0 => CompressionLevel::NoCompression, + 1 => CompressionLevel::BestSpeed, + 9 => CompressionLevel::BestCompression, + 10 => CompressionLevel::UberCompression, + _ => CompressionLevel::DefaultLevel, + }; + Codec::Deflate(DeflateSettings::new(compression_level)) + } + CompressionCodec::Zstd(level) => { + Codec::Zstandard(ZstandardSettings::new(level.min(MAX_ZSTD_LEVEL))) + } + } +} + +#[cfg(test)] +mod tests { + use apache_avro::{Codec, DeflateSettings, ZstandardSettings}; + use miniz_oxide::deflate::CompressionLevel; + use rstest::rstest; + + use super::*; + + #[rstest] + #[case::gzip_case_insensitive(Some("GZip"), Some(5), CompressionCodec::Gzip(5))] + #[case::gzip_avro_default_level(Some("gzip"), None, CompressionCodec::Gzip(DEFAULT_GZIP_LEVEL))] + #[case::zstd_explicit_level(Some("zstd"), Some(3), CompressionCodec::Zstd(3))] + #[case::zstd_avro_default_level(Some("zstd"), None, CompressionCodec::Zstd(DEFAULT_ZSTD_LEVEL))] + #[case::snappy(Some("snappy"), None, CompressionCodec::Snappy)] + #[case::uncompressed_avro_alias(Some("uncompressed"), None, CompressionCodec::None)] + #[case::no_codec(None, None, CompressionCodec::None)] + fn test_parse_avro_codec( + #[case] codec: Option<&str>, + #[case] level: Option, + #[case] expected: CompressionCodec, + ) { + assert_eq!(parse_avro_codec(codec, level).unwrap(), expected); + } + + #[rstest] + #[case::unknown_codec(Some("unknown"), Some(1), "unknown")] + #[case::lz4_unsupported(Some("lz4"), None, "lz4")] + fn test_parse_avro_codec_error( + #[case] codec: Option<&str>, + #[case] level: Option, + #[case] expected_msg: &str, + ) { + let err = parse_avro_codec(codec, level).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.to_string().contains(expected_msg), + "expected '{expected_msg}' in error: {err}" + ); + } + + #[rstest] + #[case::none(CompressionCodec::None, Codec::Null)] + #[case::snappy(CompressionCodec::Snappy, Codec::Snappy)] + #[case::gzip_best_compression( + CompressionCodec::Gzip(9), + Codec::Deflate(DeflateSettings::new(CompressionLevel::BestCompression)) + )] + #[case::gzip_default_level( + CompressionCodec::Gzip(5), + Codec::Deflate(DeflateSettings::new(CompressionLevel::DefaultLevel)) + )] + #[case::zstd(CompressionCodec::Zstd(3), Codec::Zstandard(ZstandardSettings::new(3)))] + #[case::zstd_level_clamped_to_max(CompressionCodec::Zstd(MAX_ZSTD_LEVEL + 1), Codec::Zstandard(ZstandardSettings::new(MAX_ZSTD_LEVEL)))] + fn test_to_avro_codec(#[case] input: CompressionCodec, #[case] expected: Codec) { + assert_eq!(to_avro_codec(input), expected); + } +} diff --git a/crates/iceberg/src/spec/manifest/mod.rs b/crates/iceberg/src/spec/manifest/mod.rs index c5a474ed19..d912b94869 100644 --- a/crates/iceberg/src/spec/manifest/mod.rs +++ b/crates/iceberg/src/spec/manifest/mod.rs @@ -165,6 +165,7 @@ mod tests { use tempfile::TempDir; use super::*; + use crate::compression::CompressionCodec; use crate::io::FileIO; use crate::spec::{Literal, NestedField, PrimitiveType, Struct, Transform, Type}; @@ -272,6 +273,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionCodec::None, ) .build_v2_data(); for entry in &entries { @@ -457,6 +459,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionCodec::None, ) .build_v2_data(); for entry in &entries { @@ -554,6 +557,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionCodec::None, ) .build_v1(); for entry in &entries { @@ -663,6 +667,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionCodec::None, ) .build_v1(); for entry in &entries { @@ -771,6 +776,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionCodec::None, ) .build_v2_data(); for entry in &entries { @@ -1050,6 +1056,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionCodec::None, ) .build_v2_data(); for entry in &entries { diff --git a/crates/iceberg/src/spec/manifest/writer.rs b/crates/iceberg/src/spec/manifest/writer.rs index 1b3b605fd8..38d40e2376 100644 --- a/crates/iceberg/src/spec/manifest/writer.rs +++ b/crates/iceberg/src/spec/manifest/writer.rs @@ -26,13 +26,14 @@ use super::{ Datum, FormatVersion, ManifestContentType, PartitionSpec, PrimitiveType, UNASSIGNED_SEQUENCE_NUMBER, }; +use crate::compression::CompressionCodec; use crate::error::Result; use crate::io::OutputFile; use crate::spec::manifest::_serde::{ManifestEntryV1, ManifestEntryV2}; use crate::spec::manifest::{manifest_schema_v1, manifest_schema_v2}; use crate::spec::{ DataContentType, DataFile, FieldSummary, ManifestEntry, ManifestFile, ManifestMetadata, - ManifestStatus, PrimitiveLiteral, SchemaRef, StructType, + ManifestStatus, PrimitiveLiteral, SchemaRef, StructType, avro_util, }; use crate::{Error, ErrorKind}; @@ -47,6 +48,7 @@ pub struct ManifestWriterBuilder { key_metadata: Option>, schema: SchemaRef, partition_spec: PartitionSpec, + compression: CompressionCodec, } impl ManifestWriterBuilder { @@ -57,6 +59,7 @@ impl ManifestWriterBuilder { key_metadata: Option>, schema: SchemaRef, partition_spec: PartitionSpec, + compression: CompressionCodec, ) -> Self { Self { output, @@ -64,6 +67,7 @@ impl ManifestWriterBuilder { key_metadata, schema, partition_spec, + compression, } } @@ -82,6 +86,7 @@ impl ManifestWriterBuilder { self.key_metadata, metadata, None, + self.compression, ) } @@ -100,6 +105,7 @@ impl ManifestWriterBuilder { self.key_metadata, metadata, None, + self.compression, ) } @@ -118,6 +124,7 @@ impl ManifestWriterBuilder { self.key_metadata, metadata, None, + self.compression, ) } @@ -138,6 +145,7 @@ impl ManifestWriterBuilder { // First row id is assigned by the [`ManifestListWriter`] when the manifest // is added to the list. None, + self.compression, ) } @@ -156,6 +164,7 @@ impl ManifestWriterBuilder { self.key_metadata, metadata, None, + self.compression, ) } } @@ -181,6 +190,8 @@ pub struct ManifestWriter { manifest_entries: Vec, metadata: ManifestMetadata, + + compression: CompressionCodec, } impl ManifestWriter { @@ -191,6 +202,7 @@ impl ManifestWriter { key_metadata: Option>, metadata: ManifestMetadata, first_row_id: Option, + compression: CompressionCodec, ) -> Self { Self { output, @@ -206,6 +218,7 @@ impl ManifestWriter { key_metadata, manifest_entries: Vec::new(), metadata, + compression, } } @@ -414,7 +427,12 @@ impl ManifestWriter { // Manifest schema did not change between V2 and V3 FormatVersion::V2 | FormatVersion::V3 => manifest_schema_v2(&partition_type)?, }; - let mut avro_writer = AvroWriter::new(&avro_schema, Vec::new()); + + let mut avro_writer = AvroWriter::with_codec( + &avro_schema, + Vec::new(), + avro_util::to_avro_codec(self.compression), + ); avro_writer.add_user_metadata( "schema".to_string(), to_vec(table_schema).map_err(|err| { @@ -563,8 +581,13 @@ mod tests { use tempfile::TempDir; use super::*; + use crate::compression::CompressionCodec; use crate::io::FileIO; - use crate::spec::{DataFileFormat, Manifest, NestedField, PrimitiveType, Schema, Struct, Type}; + use crate::spec::{ + DataContentType, DataFileBuilder, DataFileFormat, Manifest, ManifestContentType, + ManifestEntry, ManifestMetadata, ManifestStatus, NestedField, PartitionSpec, PrimitiveType, + Schema, Struct, Type, + }; #[tokio::test] async fn test_add_delete_existing() { @@ -696,6 +719,7 @@ mod tests { None, metadata.schema.clone(), metadata.partition_spec.clone(), + CompressionCodec::None, ) .build_v2_data(); writer.add_entry(entries[0].clone()).unwrap(); @@ -716,6 +740,91 @@ mod tests { assert_eq!(actual_manifest, Manifest::new(metadata, entries)); } + #[tokio::test] + async fn test_manifest_writer_with_compression() { + let metadata = { + let schema = Schema::builder() + .with_fields(vec![Arc::new(NestedField::required( + 1, + "id", + Type::Primitive(PrimitiveType::Int), + ))]) + .build() + .unwrap(); + + ManifestMetadata { + schema_id: 0, + schema: Arc::new(schema), + partition_spec: PartitionSpec::unpartition_spec(), + format_version: FormatVersion::V2, + content: ManifestContentType::Data, + } + }; + + async fn write_manifest( + io: &FileIO, + path: &std::path::Path, + metadata: &ManifestMetadata, + compression: CompressionCodec, + ) { + let output_file = io.new_output(path.to_str().unwrap()).unwrap(); + let mut writer = ManifestWriterBuilder::new( + output_file, + Some(1), + None, + metadata.schema.clone(), + metadata.partition_spec.clone(), + compression, + ) + .build_v2_data(); + for i in 0..1000 { + let data_file = DataFileBuilder::default() + .content(DataContentType::Data) + .file_path(format!( + "/very/long/path/to/data/directory/with/many/subdirectories/file_{i}.parquet" + )) + .file_format(DataFileFormat::Parquet) + .partition(Struct::empty()) + .file_size_in_bytes(100000 + i) + .record_count(1000 + i) + .build() + .unwrap(); + let entry = ManifestEntry::builder() + .status(ManifestStatus::Added) + .snapshot_id(1) + .sequence_number(1) + .file_sequence_number(1) + .data_file(data_file) + .build(); + writer.add_entry(entry).unwrap(); + } + writer.write_manifest_file().await.unwrap(); + } + + let tmp_dir = TempDir::new().unwrap(); + let io = FileIO::new_with_fs(); + let uncompressed_path = tmp_dir.path().join("uncompressed_manifest.avro"); + let compressed_path = tmp_dir.path().join("compressed_manifest.avro"); + + write_manifest(&io, &uncompressed_path, &metadata, CompressionCodec::None).await; + write_manifest(&io, &compressed_path, &metadata, CompressionCodec::Gzip(9)).await; + + let uncompressed_size = fs::metadata(&uncompressed_path).unwrap().len(); + let compressed_size = fs::metadata(&compressed_path).unwrap().len(); + + // Verify compression is actually working + assert!( + compressed_size < uncompressed_size, + "Compressed size ({compressed_size}) should be less than uncompressed size ({uncompressed_size})" + ); + + // Verify the compressed file can be read back correctly + let compressed_bytes = fs::read(&compressed_path).unwrap(); + let manifest = Manifest::parse_avro(&compressed_bytes).unwrap(); + assert_eq!(manifest.metadata.format_version, FormatVersion::V2); + assert_eq!(manifest.entries.len(), 1000); + } + #[tokio::test] async fn test_v3_delete_manifest_delete_file_roundtrip() { let schema = Arc::new( @@ -784,6 +893,7 @@ mod tests { None, schema.clone(), partition_spec.clone(), + CompressionCodec::None, ) .build_v3_deletes(); diff --git a/crates/iceberg/src/spec/manifest_list.rs b/crates/iceberg/src/spec/manifest_list.rs index baaab1f590..577e80f3cf 100644 --- a/crates/iceberg/src/spec/manifest_list.rs +++ b/crates/iceberg/src/spec/manifest_list.rs @@ -29,8 +29,10 @@ use serde_derive::{Deserialize, Serialize}; use self::_const_schema::{MANIFEST_LIST_AVRO_SCHEMA_V1, MANIFEST_LIST_AVRO_SCHEMA_V2}; use self::_serde::{ManifestFileV1, ManifestFileV2}; use super::{FormatVersion, Manifest}; +use crate::compression::CompressionCodec; use crate::error::Result; use crate::io::{FileIO, OutputFile}; +use crate::spec::avro_util; use crate::spec::manifest_list::_const_schema::MANIFEST_LIST_AVRO_SCHEMA_V3; use crate::spec::manifest_list::_serde::ManifestFileV3; use crate::{Error, ErrorKind}; @@ -117,7 +119,12 @@ impl ManifestListWriter { } /// Construct a v1 [`ManifestListWriter`] that writes to a provided [`OutputFile`]. - pub fn v1(output_file: OutputFile, snapshot_id: i64, parent_snapshot_id: Option) -> Self { + pub fn v1( + output_file: OutputFile, + snapshot_id: i64, + parent_snapshot_id: Option, + compression: CompressionCodec, + ) -> Self { let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), ("format-version".to_string(), "1".to_string()), @@ -135,6 +142,7 @@ impl ManifestListWriter { 0, snapshot_id, None, + compression, ) } @@ -144,6 +152,7 @@ impl ManifestListWriter { snapshot_id: i64, parent_snapshot_id: Option, sequence_number: i64, + compression: CompressionCodec, ) -> Self { let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), @@ -163,6 +172,7 @@ impl ManifestListWriter { sequence_number, snapshot_id, None, + compression, ) } @@ -173,6 +183,7 @@ impl ManifestListWriter { parent_snapshot_id: Option, sequence_number: i64, first_row_id: Option, // Always None for delete manifests + compression: CompressionCodec, ) -> Self { let mut metadata = HashMap::from_iter([ ("snapshot-id".to_string(), snapshot_id.to_string()), @@ -198,6 +209,7 @@ impl ManifestListWriter { sequence_number, snapshot_id, first_row_id, + compression, ) } @@ -208,13 +220,19 @@ impl ManifestListWriter { sequence_number: i64, snapshot_id: i64, first_row_id: Option, + compression: CompressionCodec, ) -> Self { let avro_schema = match format_version { FormatVersion::V1 => &MANIFEST_LIST_AVRO_SCHEMA_V1, FormatVersion::V2 => &MANIFEST_LIST_AVRO_SCHEMA_V2, FormatVersion::V3 => &MANIFEST_LIST_AVRO_SCHEMA_V3, }; - let mut avro_writer = Writer::new(avro_schema, Vec::new()); + + let mut avro_writer = Writer::with_codec( + avro_schema, + Vec::new(), + avro_util::to_avro_codec(compression), + ); for (key, value) in metadata { avro_writer .add_user_metadata(key, value) @@ -1365,8 +1383,9 @@ mod test { use tempfile::TempDir; use super::_serde::ManifestListV2; + use crate::compression::CompressionCodec; use crate::io::FileIO; - use crate::spec::manifest_list::_serde::{ManifestListV1, ManifestListV3}; + use crate::spec::manifest_list::_serde::{ManifestFileV1, ManifestListV1, ManifestListV3}; use crate::spec::{ Datum, FieldSummary, ManifestContentType, ManifestFile, ManifestList, ManifestListWriter, UNASSIGNED_SEQUENCE_NUMBER, @@ -1407,6 +1426,7 @@ mod test { file_io.new_output(full_path.clone()).unwrap(), 1646658105718557341, Some(1646658105718557341), + CompressionCodec::None, ); writer @@ -1480,6 +1500,7 @@ mod test { 1646658105718557341, Some(1646658105718557341), 1, + CompressionCodec::None, ); writer @@ -1554,6 +1575,7 @@ mod test { Some(377075049360453639), 1, Some(10), + CompressionCodec::None, ); writer @@ -1690,7 +1712,12 @@ mod test { let io = FileIO::new_with_fs(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0)); + let mut writer = ManifestListWriter::v1( + output_file, + 1646658105718557341, + Some(0), + CompressionCodec::None, + ); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1737,7 +1764,13 @@ mod test { let io = FileIO::new_with_fs(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num); + let mut writer = ManifestListWriter::v2( + output_file, + snapshot_id, + Some(0), + seq_num, + CompressionCodec::None, + ); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1785,8 +1818,14 @@ mod test { let io = FileIO::new_with_fs(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = - ManifestListWriter::v3(output_file, snapshot_id, Some(0), seq_num, Some(10)); + let mut writer = ManifestListWriter::v3( + output_file, + snapshot_id, + Some(0), + seq_num, + Some(10), + CompressionCodec::None, + ); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1833,7 +1872,12 @@ mod test { let io = FileIO::new_with_fs(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0)); + let mut writer = ManifestListWriter::v1( + output_file, + 1646658105718557341, + Some(0), + CompressionCodec::None, + ); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1878,7 +1922,12 @@ mod test { let io = FileIO::new_with_fs(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v1(output_file, 1646658105718557341, Some(0)); + let mut writer = ManifestListWriter::v1( + output_file, + 1646658105718557341, + Some(0), + CompressionCodec::None, + ); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1925,7 +1974,13 @@ mod test { let io = FileIO::new_with_fs(); let output_file = io.new_output(path.to_str().unwrap()).unwrap(); - let mut writer = ManifestListWriter::v2(output_file, snapshot_id, Some(0), seq_num); + let mut writer = ManifestListWriter::v2( + output_file, + snapshot_id, + Some(0), + seq_num, + CompressionCodec::None, + ); writer .add_manifests(expected_manifest_list.entries.clone().into_iter()) .unwrap(); @@ -1994,8 +2049,6 @@ mod test { #[test] fn test_manifest_file_v1_to_v2_projection() { - use crate::spec::manifest_list::_serde::ManifestFileV1; - // Create a V1 manifest file object (without V2 fields) let v1_manifest = ManifestFileV1 { manifest_path: "/test/manifest.avro".to_string(), @@ -2044,4 +2097,89 @@ mod test { assert_eq!(v2_manifest.partitions, None); assert_eq!(v2_manifest.key_metadata, None); } + + #[tokio::test] + async fn test_manifest_list_writer_with_compression() { + // Create multiple manifest entries to make compression effective + let mut entries = Vec::new(); + for i in 0..100 { + entries.push(ManifestFile { + manifest_path: format!("/test/manifest{i}.avro"), + manifest_length: 1000 + i, + partition_spec_id: 0, + content: ManifestContentType::Data, + sequence_number: 1, + min_sequence_number: 1, + added_snapshot_id: 1646658105718557341, + added_files_count: Some(10), + existing_files_count: Some(5), + deleted_files_count: Some(2), + added_rows_count: Some(100), + existing_rows_count: Some(50), + deleted_rows_count: Some(20), + partitions: None, + key_metadata: None, + first_row_id: None, + }); + } + let manifest_list = ManifestList { entries }; + + let file_io = FileIO::new_with_fs(); + let tmp_dir = TempDir::new().unwrap(); + + // Write uncompressed manifest list + let uncompressed_path = tmp_dir + .path() + .join("uncompressed_manifest_list.avro") + .to_str() + .unwrap() + .to_string(); + let mut writer = ManifestListWriter::v2( + file_io.new_output(&uncompressed_path).unwrap(), + 1646658105718557341, + Some(0), + 1, + CompressionCodec::None, + ); + writer + .add_manifests(manifest_list.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); + let uncompressed_size = fs::metadata(&uncompressed_path).unwrap().len(); + + // Write compressed manifest list with gzip + let compressed_path = tmp_dir + .path() + .join("compressed_manifest_list.avro") + .to_str() + .unwrap() + .to_string(); + + let compression = CompressionCodec::Gzip(9); + let mut writer = ManifestListWriter::v2( + file_io.new_output(&compressed_path).unwrap(), + 1646658105718557341, + Some(0), + 1, + compression, + ); + writer + .add_manifests(manifest_list.entries.clone().into_iter()) + .unwrap(); + writer.close().await.unwrap(); + let compressed_size = fs::metadata(&compressed_path).unwrap().len(); + + // Verify compression is actually working + assert!( + compressed_size < uncompressed_size, + "Compressed size ({compressed_size}) should be less than uncompressed size ({uncompressed_size})" + ); + + // Verify the compressed file can be read back correctly + let compressed_bytes = fs::read(&compressed_path).unwrap(); + let parsed_manifest_list = + ManifestList::parse_with_version(&compressed_bytes, crate::spec::FormatVersion::V2) + .unwrap(); + assert_eq!(manifest_list, parsed_manifest_list); + } } diff --git a/crates/iceberg/src/spec/mod.rs b/crates/iceberg/src/spec/mod.rs index b23ca1eda0..8822dae64f 100644 --- a/crates/iceberg/src/spec/mod.rs +++ b/crates/iceberg/src/spec/mod.rs @@ -17,6 +17,7 @@ //! Spec for Iceberg. +mod avro_util; mod datatypes; mod encrypted_key; mod manifest; diff --git a/crates/iceberg/src/spec/table_properties.rs b/crates/iceberg/src/spec/table_properties.rs index a3d4e7fdaa..1270e5690c 100644 --- a/crates/iceberg/src/spec/table_properties.rs +++ b/crates/iceberg/src/spec/table_properties.rs @@ -21,6 +21,7 @@ use std::str::FromStr; use crate::compression::CompressionCodec; use crate::error::{Error, ErrorKind, Result}; +use crate::spec::avro_util; // Helper function to parse a property from a HashMap // If the property is not found, use the default value @@ -114,6 +115,8 @@ pub struct TableProperties { pub write_format_default: String, /// The target file size for files. pub write_target_file_size_bytes: usize, + /// Compression codec for Avro files (manifests, manifest lists) + pub avro_compression_codec: CompressionCodec, /// Compression codec for metadata files (JSON) pub metadata_compression_codec: CompressionCodec, /// Whether to use `FanoutWriter` for partitioned tables. @@ -210,6 +213,14 @@ impl TableProperties { /// Default target file size pub const PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT: usize = 512 * 1024 * 1024; // 512 MB + /// Compression codec for Avro files (manifests, manifest lists) + pub const PROPERTY_AVRO_COMPRESSION_CODEC: &str = "write.avro.compression-codec"; + /// Default Avro compression codec - gzip + pub const PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT: &str = "gzip"; + + /// Compression level for Avro files + pub const PROPERTY_AVRO_COMPRESSION_LEVEL: &str = "write.avro.compression-level"; + /// Compression codec for metadata files (JSON) pub const PROPERTY_METADATA_COMPRESSION_CODEC: &str = "write.metadata.compression-codec"; /// Default metadata compression codec - uncompressed @@ -264,6 +275,27 @@ impl TryFrom<&HashMap> for TableProperties { TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT, )?, + avro_compression_codec: { + let codec_str = props + .get(TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC) + .map(|s| s.as_str()) + .unwrap_or(TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC_DEFAULT); + let level = props + .get(TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL) + .map(|s| { + s.parse::().map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid value for {}: {e}", + TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL + ), + ) + }) + }) + .transpose()?; + avro_util::parse_avro_codec(Some(codec_str), level)? + }, metadata_compression_codec: parse_metadata_file_compression(props)?, write_datafusion_fanout_enabled: parse_property( props, @@ -308,17 +340,47 @@ mod tests { table_properties.write_target_file_size_bytes, TableProperties::PROPERTY_WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT ); + // Test compression defaults - gzip with Avro default level (9) + assert_eq!( + table_properties.avro_compression_codec, + CompressionCodec::Gzip(9) + ); // Test compression defaults (none means CompressionCodec::None) assert_eq!( table_properties.metadata_compression_codec, CompressionCodec::None ); + // Test datafusion fanout writer default + assert_eq!( + table_properties.write_datafusion_fanout_enabled, + TableProperties::PROPERTY_DATAFUSION_WRITE_FANOUT_ENABLED_DEFAULT + ); assert_eq!( table_properties.gc_enabled, TableProperties::PROPERTY_GC_ENABLED_DEFAULT ); } + #[test] + fn test_table_properties_avro_compression() { + let props = HashMap::from([ + ( + TableProperties::PROPERTY_AVRO_COMPRESSION_CODEC.to_string(), + "zstd".to_string(), + ), + ( + TableProperties::PROPERTY_AVRO_COMPRESSION_LEVEL.to_string(), + "3".to_string(), + ), + ]); + let table_properties = TableProperties::try_from(&props).unwrap(); + // Check that it parsed to a Zstd codec with level 3 + assert_eq!( + table_properties.avro_compression_codec, + CompressionCodec::Zstd(3) + ); + } + #[test] fn test_table_properties_compression() { let props = HashMap::from([( diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index c8bf26a174..743c925247 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -233,6 +233,17 @@ impl<'a> SnapshotProducer<'a> { DataFileFormat::Avro ); let output_file = self.table.file_io().new_output(new_manifest_path)?; + + // Get compression settings from table properties + let table_props = + TableProperties::try_from(self.table.metadata().properties()).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Failed to parse table properties for compression settings", + ) + .with_source(e) + })?; + let builder = ManifestWriterBuilder::new( output_file, Some(self.snapshot_id), @@ -243,7 +254,9 @@ impl<'a> SnapshotProducer<'a> { .default_partition_spec() .as_ref() .clone(), + table_props.avro_compression_codec, ); + match self.table.metadata().format_version() { FormatVersion::V1 => Ok(builder.build_v1()), FormatVersion::V2 => match content { @@ -424,6 +437,19 @@ impl<'a> SnapshotProducer<'a> { let manifest_list_path = self.generate_manifest_list_file_path(0); let next_seq_num = self.table.metadata().next_sequence_number(); let first_row_id = self.table.metadata().next_row_id(); + + // Get compression settings from table properties + let table_props = + TableProperties::try_from(self.table.metadata().properties()).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Failed to parse table properties for compression settings", + ) + .with_source(e) + })?; + + let compression = table_props.avro_compression_codec; + let mut manifest_list_writer = match self.table.metadata().format_version() { FormatVersion::V1 => ManifestListWriter::v1( self.table @@ -431,6 +457,7 @@ impl<'a> SnapshotProducer<'a> { .new_output(manifest_list_path.clone())?, self.snapshot_id, self.table.metadata().current_snapshot_id(), + compression, ), FormatVersion::V2 => ManifestListWriter::v2( self.table @@ -439,6 +466,7 @@ impl<'a> SnapshotProducer<'a> { self.snapshot_id, self.table.metadata().current_snapshot_id(), next_seq_num, + compression, ), FormatVersion::V3 => ManifestListWriter::v3( self.table @@ -448,6 +476,7 @@ impl<'a> SnapshotProducer<'a> { self.table.metadata().current_snapshot_id(), next_seq_num, Some(first_row_id), + compression, ), };