Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
665b970
feat!: Support compression codecs for JSON metadata and Avro
emkornfield Nov 12, 2025
091d3bc
fmt
emkornfield Nov 13, 2025
41a8c1c
fix clippy
emkornfield Nov 13, 2025
51e781e
clippy again
emkornfield Nov 13, 2025
253bf59
wip
emkornfield Nov 16, 2025
8bdb52d
address comments
emkornfield Nov 16, 2025
9d27116
no clone needed
emkornfield Nov 16, 2025
d1ee0b2
test compression works
emkornfield Nov 16, 2025
393622f
comments
emkornfield Nov 17, 2025
46fdb8e
update tests
emkornfield Nov 17, 2025
d50cb7d
address comments
emkornfield Nov 19, 2025
5370f77
remove parse optional property
emkornfield Nov 19, 2025
6b3d8ed
fmt
emkornfield Nov 19, 2025
5ec090f
wip
emkornfield Nov 21, 2025
4337a34
address comments
emkornfield Nov 21, 2025
dba26a1
put parsing in table properties
emkornfield Dec 11, 2025
d81ba56
cargo fmt
emkornfield Dec 16, 2025
23384ab
fmt
emkornfield Dec 16, 2025
173fbef
remove unneeded tests
emkornfield Dec 16, 2025
a52d015
fix package import
emkornfield Dec 16, 2025
f4dd663
clippy and visibility
emkornfield Dec 16, 2025
6745adf
merge main
emkornfield Dec 16, 2025
95a217e
fix
emkornfield Dec 16, 2025
ef41b4d
update cargo lock
emkornfield Dec 16, 2025
d9603ff
add missing args
emkornfield Dec 16, 2025
e43e0cc
address clippy
emkornfield Dec 16, 2025
0ec5784
Fmt
emkornfield Dec 16, 2025
ed71f0e
move use statements to top level
emkornfield Dec 16, 2025
edb6886
remove duplicate imports
emkornfield Dec 16, 2025
9d361e1
Merge databricks/main into fix_compression
emkornfield Jan 5, 2026
1a5e8fc
Merge remote-tracking branch 'databricks/main' into fix_compression
emkornfield Jan 9, 2026
1ca553a
Merge remote-tracking branch 'databricks/main' into fix_compression
emkornfield Mar 11, 2026
6dcd177
Merge remote-tracking branch 'databricks/main' into fix_compression
emkornfield Mar 19, 2026
26f6d45
update to use compression codec
emkornfield Mar 19, 2026
50bfe44
fmt
emkornfield Mar 19, 2026
175e61f
Merge remote-tracking branch 'databricks/main' into fix_compression
emkornfield Mar 31, 2026
722cb55
fmt
emkornfield Mar 31, 2026
05db1d7
simplify test
emkornfield Mar 31, 2026
da34d53
fixes
emkornfield Mar 31, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ rust-version = "1.92"
aes = { version = "0.8", features = ["zeroize"] }
aes-gcm = "0.10"
anyhow = "1.0.72"
apache-avro = { version = "0.21", features = ["zstandard"] }
apache-avro = { version = "0.21", features = ["zstandard", "snappy"] }
array-init = "2"
arrow-arith = "58"
arrow-array = "58"
Expand Down Expand Up @@ -98,6 +98,7 @@ log = "0.4.28"
metainfo = "0.7.14"
mimalloc = "0.1.46"
minijinja = "2.12.0"
miniz_oxide = "0.8"
mockall = "0.13.1"
mockito = "1"
motore-macros = "0.4.3"
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ flate2 = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
miniz_oxide = { workspace = true }
moka = { version = "0.12.10", features = ["future"] }
murmur3 = { workspace = true }
once_cell = { workspace = true }
Expand Down Expand Up @@ -89,6 +91,7 @@ mockall = { workspace = true }
pretty_assertions = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
rstest = { workspace = true }
tempfile = { workspace = true }
minijinja = { workspace = true }

Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl CompressionCodec {
///
/// # Errors
///
/// Returns an error for Lz4 and Zstd as they are not fully supported.
/// Returns an error for Lz4, Zstd, and Snappy as they are not fully supported.
pub fn suffix(&self) -> Result<&'static str> {
match self {
CompressionCodec::None => Ok(""),
Expand Down
3 changes: 3 additions & 0 deletions crates/iceberg/src/io/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ mod tests {

use super::*;
use crate::TableIdent;
use crate::compression::CompressionCodec;
use crate::io::{FileIO, OutputFile};
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Literal, ManifestEntry,
Expand Down Expand Up @@ -272,6 +273,7 @@ mod tests {
None,
current_schema.clone(),
current_partition_spec.as_ref().clone(),
CompressionCodec::None,
)
.build_v2_data();
writer
Expand Down Expand Up @@ -304,6 +306,7 @@ mod tests {
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
CompressionCodec::None,
);
manifest_list_write
.add_manifests(vec![data_file_manifest].into_iter())
Expand Down
3 changes: 2 additions & 1 deletion crates/iceberg/src/puffin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,10 @@ mod tests {
assert!(validate_puffin_compression(CompressionCodec::None).is_ok());
assert!(validate_puffin_compression(CompressionCodec::Lz4).is_ok());
assert!(validate_puffin_compression(CompressionCodec::zstd_default()).is_ok());
assert!(validate_puffin_compression(CompressionCodec::Zstd(5)).is_ok());
assert!(validate_puffin_compression(CompressionCodec::Zstd(3)).is_ok());

// Unsupported codecs
assert!(validate_puffin_compression(CompressionCodec::gzip_default()).is_err());
assert!(validate_puffin_compression(CompressionCodec::Snappy).is_err());
}
}
14 changes: 9 additions & 5 deletions crates/iceberg/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ pub mod tests {
//! shared tests for the table scan API
#![allow(missing_docs)]

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::fs;
use std::fs::File;
use std::sync::Arc;
Expand All @@ -586,6 +586,7 @@ pub mod tests {

use crate::TableIdent;
use crate::arrow::ArrowReaderBuilder;
use crate::compression::CompressionCodec;
use crate::expr::{BoundPredicate, Reference};
use crate::io::{FileIO, OutputFile};
use crate::metadata_columns::RESERVED_COL_NAME_FILE;
Expand Down Expand Up @@ -756,6 +757,7 @@ pub mod tests {
None,
current_schema.clone(),
current_partition_spec.as_ref().clone(),
CompressionCodec::None,
)
.build_v2_data();
writer
Expand Down Expand Up @@ -833,6 +835,7 @@ pub mod tests {
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
CompressionCodec::None,
);
manifest_list_write
.add_manifests(vec![data_file_manifest].into_iter())
Expand Down Expand Up @@ -980,6 +983,7 @@ pub mod tests {
None,
current_schema.clone(),
current_partition_spec.as_ref().clone(),
CompressionCodec::None,
)
.build_v2_data();

Expand Down Expand Up @@ -1064,6 +1068,7 @@ pub mod tests {
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
CompressionCodec::None,
);
manifest_list_write
.add_manifests(vec![data_file_manifest].into_iter())
Expand All @@ -1086,6 +1091,7 @@ pub mod tests {
None,
current_schema.clone(),
current_partition_spec.as_ref().clone(),
CompressionCodec::None,
)
.build_v2_data();

Expand Down Expand Up @@ -1121,6 +1127,7 @@ pub mod tests {
None,
current_schema.clone(),
current_partition_spec.as_ref().clone(),
CompressionCodec::None,
)
.build_v2_deletes();

Expand Down Expand Up @@ -1155,6 +1162,7 @@ pub mod tests {
current_snapshot.snapshot_id(),
current_snapshot.parent_snapshot_id(),
current_snapshot.sequence_number(),
CompressionCodec::None,
);
manifest_list_write
.add_manifests(vec![data_manifest, delete_manifest].into_iter())
Expand Down Expand Up @@ -1812,8 +1820,6 @@ pub mod tests {

#[tokio::test]
async fn test_select_with_file_column() {
use arrow_array::cast::AsArray;

let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

Expand Down Expand Up @@ -1935,8 +1941,6 @@ pub mod tests {

#[tokio::test]
async fn test_file_column_with_multiple_files() {
use std::collections::HashSet;

let mut fixture = TableTestFixture::new();
fixture.setup_manifest_files().await;

Expand Down
153 changes: 153 additions & 0 deletions crates/iceberg/src/spec/avro_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Utilities for working with Apache Avro in Iceberg.

use apache_avro::{Codec, DeflateSettings, ZstandardSettings};
use miniz_oxide::deflate::CompressionLevel;

use crate::compression::CompressionCodec;
use crate::{Error, ErrorKind, Result};

/// Codec name for uncompressed (Avro-specific; maps to [`CompressionCodec::None`])
const CODEC_UNCOMPRESSED: &str = "uncompressed";

/// Default compression level for gzip in Avro (matches Java implementation)
const DEFAULT_GZIP_LEVEL: u8 = 9;
/// Default compression level for zstd in Avro (matches Java implementation)
const DEFAULT_ZSTD_LEVEL: u8 = 1;
/// Max supported level for ZSTD
const MAX_ZSTD_LEVEL: u8 = 22;

/// Parse a codec name and optional level into a [`CompressionCodec`].
///
/// The codec name is parsed via [`CompressionCodec`]'s standard deserialization.
/// `"uncompressed"` (Avro-specific) is mapped to [`CompressionCodec::None`].
/// Avro-specific defaults apply when `level` is `None`: gzip→9, zstd→1.
pub(crate) fn parse_avro_codec(codec: Option<&str>, level: Option<u8>) -> Result<CompressionCodec> {
let Some(codec_str) = codec else {
return Ok(CompressionCodec::None);
};
let normalized = if codec_str.eq_ignore_ascii_case(CODEC_UNCOMPRESSED) {
"none"
} else {
codec_str
};
let parsed: CompressionCodec = serde_json::from_value(serde_json::Value::String(
normalized.to_string(),
))
.map_err(|_| {
Error::new(
ErrorKind::DataInvalid,
format!("Unrecognized Avro compression codec: {codec_str}"),
)
})?;
match parsed {
CompressionCodec::None => Ok(CompressionCodec::None),
CompressionCodec::Snappy => Ok(CompressionCodec::Snappy),
CompressionCodec::Gzip(_) => {
Ok(CompressionCodec::Gzip(level.unwrap_or(DEFAULT_GZIP_LEVEL)))
}
CompressionCodec::Zstd(_) => {
Ok(CompressionCodec::Zstd(level.unwrap_or(DEFAULT_ZSTD_LEVEL)))
}
other => Err(Error::new(
ErrorKind::DataInvalid,
format!("Unsupported Avro compression codec: {}", other.name()),
)),
}
}

/// Convert a [`CompressionCodec`] to an [`apache_avro::Codec`] for use in Avro writers.
pub(crate) fn to_avro_codec(codec: CompressionCodec) -> Codec {
match codec {
CompressionCodec::None => Codec::Null,
CompressionCodec::Snappy => Codec::Snappy,
CompressionCodec::Lz4 => Codec::Null,
CompressionCodec::Gzip(level) => {
let compression_level = match level {
0 => CompressionLevel::NoCompression,
1 => CompressionLevel::BestSpeed,
9 => CompressionLevel::BestCompression,
10 => CompressionLevel::UberCompression,
_ => CompressionLevel::DefaultLevel,
};
Codec::Deflate(DeflateSettings::new(compression_level))
}
CompressionCodec::Zstd(level) => {
Codec::Zstandard(ZstandardSettings::new(level.min(MAX_ZSTD_LEVEL)))
}
}
}

#[cfg(test)]
mod tests {
use apache_avro::{Codec, DeflateSettings, ZstandardSettings};
use miniz_oxide::deflate::CompressionLevel;
use rstest::rstest;

use super::*;

#[rstest]
#[case::gzip_case_insensitive(Some("GZip"), Some(5), CompressionCodec::Gzip(5))]
#[case::gzip_avro_default_level(Some("gzip"), None, CompressionCodec::Gzip(DEFAULT_GZIP_LEVEL))]
#[case::zstd_explicit_level(Some("zstd"), Some(3), CompressionCodec::Zstd(3))]
#[case::zstd_avro_default_level(Some("zstd"), None, CompressionCodec::Zstd(DEFAULT_ZSTD_LEVEL))]
#[case::snappy(Some("snappy"), None, CompressionCodec::Snappy)]
#[case::uncompressed_avro_alias(Some("uncompressed"), None, CompressionCodec::None)]
#[case::no_codec(None, None, CompressionCodec::None)]
fn test_parse_avro_codec(
#[case] codec: Option<&str>,
#[case] level: Option<u8>,
#[case] expected: CompressionCodec,
) {
assert_eq!(parse_avro_codec(codec, level).unwrap(), expected);
}

#[rstest]
#[case::unknown_codec(Some("unknown"), Some(1), "unknown")]
#[case::lz4_unsupported(Some("lz4"), None, "lz4")]
fn test_parse_avro_codec_error(
#[case] codec: Option<&str>,
#[case] level: Option<u8>,
#[case] expected_msg: &str,
) {
let err = parse_avro_codec(codec, level).unwrap_err();
assert_eq!(err.kind(), ErrorKind::DataInvalid);
assert!(
err.to_string().contains(expected_msg),
"expected '{expected_msg}' in error: {err}"
);
}

#[rstest]
#[case::none(CompressionCodec::None, Codec::Null)]
#[case::snappy(CompressionCodec::Snappy, Codec::Snappy)]
#[case::gzip_best_compression(
CompressionCodec::Gzip(9),
Codec::Deflate(DeflateSettings::new(CompressionLevel::BestCompression))
)]
#[case::gzip_default_level(
CompressionCodec::Gzip(5),
Codec::Deflate(DeflateSettings::new(CompressionLevel::DefaultLevel))
)]
#[case::zstd(CompressionCodec::Zstd(3), Codec::Zstandard(ZstandardSettings::new(3)))]
#[case::zstd_level_clamped_to_max(CompressionCodec::Zstd(MAX_ZSTD_LEVEL + 1), Codec::Zstandard(ZstandardSettings::new(MAX_ZSTD_LEVEL)))]
fn test_to_avro_codec(#[case] input: CompressionCodec, #[case] expected: Codec) {
assert_eq!(to_avro_codec(input), expected);
}
}
7 changes: 7 additions & 0 deletions crates/iceberg/src/spec/manifest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ mod tests {
use tempfile::TempDir;

use super::*;
use crate::compression::CompressionCodec;
use crate::io::FileIO;
use crate::spec::{Literal, NestedField, PrimitiveType, Struct, Transform, Type};

Expand Down Expand Up @@ -272,6 +273,7 @@ mod tests {
None,
metadata.schema.clone(),
metadata.partition_spec.clone(),
CompressionCodec::None,
)
.build_v2_data();
for entry in &entries {
Expand Down Expand Up @@ -457,6 +459,7 @@ mod tests {
None,
metadata.schema.clone(),
metadata.partition_spec.clone(),
CompressionCodec::None,
)
.build_v2_data();
for entry in &entries {
Expand Down Expand Up @@ -554,6 +557,7 @@ mod tests {
None,
metadata.schema.clone(),
metadata.partition_spec.clone(),
CompressionCodec::None,
)
.build_v1();
for entry in &entries {
Expand Down Expand Up @@ -663,6 +667,7 @@ mod tests {
None,
metadata.schema.clone(),
metadata.partition_spec.clone(),
CompressionCodec::None,
)
.build_v1();
for entry in &entries {
Expand Down Expand Up @@ -771,6 +776,7 @@ mod tests {
None,
metadata.schema.clone(),
metadata.partition_spec.clone(),
CompressionCodec::None,
)
.build_v2_data();
for entry in &entries {
Expand Down Expand Up @@ -1050,6 +1056,7 @@ mod tests {
None,
metadata.schema.clone(),
metadata.partition_spec.clone(),
CompressionCodec::None,
)
.build_v2_data();
for entry in &entries {
Expand Down
Loading
Loading