Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ enum-ordinalize = "4.3.0"
env_logger = "0.11.8"
expect-test = "1"
faststr = "0.2.31"
flate2 = "1.1.5"
fnv = "1.0.7"
fs-err = "3.1.0"
futures = "0.3"
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ bytes = { workspace = true }
chrono = { workspace = true }
derive_builder = { workspace = true }
expect-test = { workspace = true }
flate2 = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
Expand Down
50 changes: 49 additions & 1 deletion crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ use std::cmp::Ordering;
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::hash::Hash;
use std::io::Read as _;
use std::sync::Arc;

use _serde::TableMetadataEnum;
use chrono::{DateTime, Utc};
use flate2::read::GzDecoder;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you go to read metadata_content it's already in memory as a &[u8] so I think we should use flate2::bufread::GzDecoder here. It might be an imperceptible performance difference, but you never know how big metadata might get :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, should be the opposite, no? With bufread we'll pay for an extra copy, but the "syscalls" (read) are free.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you're right, I had it backwards in my head, sorry about that!

use serde::{Deserialize, Serialize};
use serde_repr::{Deserialize_repr, Serialize_repr};
use uuid::Uuid;
Expand Down Expand Up @@ -426,9 +428,30 @@ impl TableMetadata {
file_io: &FileIO,
metadata_location: impl AsRef<str>,
) -> Result<TableMetadata> {
let metadata_location = metadata_location.as_ref();
let input_file = file_io.new_input(metadata_location)?;
let metadata_content = input_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?;

// Check if the file is compressed by looking for the gzip "magic number".
let metadata = if metadata_content.len() > 2
&& metadata_content[0] == 0x1F
&& metadata_content[1] == 0x8B
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a debug log here to explain why we choose to use try to decompress it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you like me to pull in a dependency? Neither tracing or log are available here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think tracing is already there, anyway, I think the error message is good enough.

let mut decoder = GzDecoder::new(metadata_content.as_ref());
let mut decompressed_data = Vec::new();
decoder.read_to_end(&mut decompressed_data).map_err(|e| {
Error::new(
ErrorKind::DataInvalid,
"Trying to read compressed metadata file",
)
.with_context("file_path", metadata_location)
.with_source(e)
})?;
serde_json::from_slice(&decompressed_data)?
} else {
serde_json::from_slice(&metadata_content)?
};

Ok(metadata)
}

Expand Down Expand Up @@ -1516,6 +1539,7 @@ impl SnapshotLog {
mod tests {
use std::collections::HashMap;
use std::fs;
use std::io::Write as _;
use std::sync::Arc;

use anyhow::Result;
Expand Down Expand Up @@ -3524,6 +3548,30 @@ mod tests {
assert_eq!(read_metadata, original_metadata);
}

#[tokio::test]
async fn test_table_metadata_read_compressed() {
let temp_dir = TempDir::new().unwrap();
let metadata_location = temp_dir.path().join("v1.gz.metadata.json");

let original_metadata: TableMetadata = get_test_table_metadata("TableMetadataV2Valid.json");
let json = serde_json::to_string(&original_metadata).unwrap();

let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
encoder.write_all(json.as_bytes()).unwrap();
std::fs::write(&metadata_location, encoder.finish().unwrap())
.expect("failed to write metadata");

// Read the metadata back
let file_io = FileIOBuilder::new_fs_io().build().unwrap();
let metadata_location = metadata_location.to_str().unwrap();
let read_metadata = TableMetadata::read_from(&file_io, metadata_location)
.await
.unwrap();

// Verify the metadata matches
assert_eq!(read_metadata, original_metadata);
}

#[tokio::test]
async fn test_table_metadata_read_nonexistent_file() {
// Create a FileIO instance
Expand Down
4 changes: 1 addition & 3 deletions crates/iceberg/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,7 @@ impl StaticTable {
table_ident: TableIdent,
file_io: FileIO,
) -> Result<Self> {
let metadata_file = file_io.new_input(metadata_location)?;
let metadata_file_content = metadata_file.read().await?;
let metadata = serde_json::from_slice::<TableMetadata>(&metadata_file_content)?;
let metadata = TableMetadata::read_from(&file_io, metadata_location).await?;

let table = Table::builder()
.metadata(metadata)
Expand Down
Loading