Skip to content

Commit b6de5db

Browse files
emkornfieldkevinjqliuXuanwo
authored
feat: Honor compression settings for metadata.json on write (#1876)
## Which issue does this PR close? Split off from #1851 - Partially fixes #1731. ## What changes are included in this PR? This change honors the compression setting for metadata.json file (`write.metadata.compression-codec`). ## Are these changes tested? Add unit test to verify files are gzipped when the flag is enabled. BREAKING CHANGE: Make `write_to` take `MetadataLocation` --------- Co-authored-by: Kevin Liu <kevinjqliu@users.noreply.github.com> Co-authored-by: Xuanwo <github@xuanwo.io>
1 parent 7ef4063 commit b6de5db

File tree

13 files changed

+655
-76
lines changed

13 files changed

+655
-76
lines changed

crates/catalog/glue/src/catalog.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::collections::HashMap;
1919
use std::fmt::Debug;
20+
use std::str::FromStr;
2021
use std::sync::Arc;
2122

2223
use anyhow::anyhow;
@@ -550,14 +551,14 @@ impl Catalog for GlueCatalog {
550551
let metadata = TableMetadataBuilder::from_table_creation(creation)?
551552
.build()?
552553
.metadata;
553-
let metadata_location =
554-
MetadataLocation::new_with_table_location(location.clone()).to_string();
554+
let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
555555

556556
metadata.write_to(&self.file_io, &metadata_location).await?;
557557

558+
let metadata_location_str = metadata_location.to_string();
558559
let glue_table = convert_to_glue_table(
559560
&table_name,
560-
metadata_location.clone(),
561+
metadata_location_str.clone(),
561562
&metadata,
562563
metadata.properties(),
563564
None,
@@ -575,7 +576,7 @@ impl Catalog for GlueCatalog {
575576

576577
Table::builder()
577578
.file_io(self.file_io())
578-
.metadata_location(metadata_location)
579+
.metadata_location(metadata_location_str)
579580
.metadata(metadata)
580581
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
581582
.build()
@@ -813,12 +814,13 @@ impl Catalog for GlueCatalog {
813814
let current_metadata_location = current_table.metadata_location_result()?.to_string();
814815

815816
let staged_table = commit.apply(current_table)?;
816-
let staged_metadata_location = staged_table.metadata_location_result()?;
817+
let staged_metadata_location_str = staged_table.metadata_location_result()?;
818+
let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
817819

818820
// Write new metadata
819821
staged_table
820822
.metadata()
821-
.write_to(staged_table.file_io(), staged_metadata_location)
823+
.write_to(staged_table.file_io(), &staged_metadata_location)
822824
.await?;
823825

824826
// Persist staged table to Glue with optimistic locking

crates/catalog/glue/src/utils.rs

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -306,8 +306,6 @@ mod tests {
306306
fn test_convert_to_glue_table() -> Result<()> {
307307
let table_name = "my_table".to_string();
308308
let location = "s3a://warehouse/hive".to_string();
309-
let metadata_location = MetadataLocation::new_with_table_location(location).to_string();
310-
let properties = HashMap::new();
311309
let schema = Schema::builder()
312310
.with_schema_id(1)
313311
.with_fields(vec![
@@ -316,6 +314,8 @@ mod tests {
316314
.build()?;
317315

318316
let metadata = create_metadata(schema)?;
317+
let metadata_location =
318+
MetadataLocation::new_with_metadata(location, &metadata).to_string();
319319

320320
let parameters = HashMap::from([
321321
(ICEBERG_FIELD_ID.to_string(), "1".to_string()),
@@ -336,8 +336,13 @@ mod tests {
336336
.location(metadata.location())
337337
.build();
338338

339-
let result =
340-
convert_to_glue_table(&table_name, metadata_location, &metadata, &properties, None)?;
339+
let result = convert_to_glue_table(
340+
&table_name,
341+
metadata_location,
342+
&metadata,
343+
metadata.properties(),
344+
None,
345+
)?;
341346

342347
assert_eq!(result.name(), &table_name);
343348
assert_eq!(result.description(), None);

crates/catalog/hms/src/catalog.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -463,17 +463,17 @@ impl Catalog for HmsCatalog {
463463
.build()?
464464
.metadata;
465465

466-
let metadata_location =
467-
MetadataLocation::new_with_table_location(location.clone()).to_string();
466+
let metadata_location = MetadataLocation::new_with_metadata(location.clone(), &metadata);
468467

469468
metadata.write_to(&self.file_io, &metadata_location).await?;
470469

470+
let metadata_location_str = metadata_location.to_string();
471471
let hive_table = convert_to_hive_table(
472472
db_name.clone(),
473473
metadata.current_schema(),
474474
table_name.clone(),
475475
location,
476-
metadata_location.clone(),
476+
metadata_location_str.clone(),
477477
metadata.properties(),
478478
)?;
479479

@@ -485,7 +485,7 @@ impl Catalog for HmsCatalog {
485485

486486
Table::builder()
487487
.file_io(self.file_io())
488-
.metadata_location(metadata_location)
488+
.metadata_location(metadata_location_str)
489489
.metadata(metadata)
490490
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
491491
.build()

crates/catalog/hms/src/utils.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -311,8 +311,8 @@ fn get_current_time() -> Result<i32> {
311311

312312
#[cfg(test)]
313313
mod tests {
314-
use iceberg::spec::{NestedField, PrimitiveType, Type};
315-
use iceberg::{MetadataLocation, Namespace, NamespaceIdent};
314+
use iceberg::spec::{NestedField, PrimitiveType, TableMetadataBuilder, Type};
315+
use iceberg::{MetadataLocation, Namespace, NamespaceIdent, TableCreation};
316316

317317
use super::*;
318318

@@ -343,8 +343,6 @@ mod tests {
343343
let db_name = "my_db".to_string();
344344
let table_name = "my_table".to_string();
345345
let location = "s3a://warehouse/hms".to_string();
346-
let metadata_location =
347-
MetadataLocation::new_with_table_location(location.clone()).to_string();
348346
let properties = HashMap::new();
349347
let schema = Schema::builder()
350348
.with_schema_id(1)
@@ -354,6 +352,18 @@ mod tests {
354352
])
355353
.build()?;
356354

355+
let table_creation = TableCreation::builder()
356+
.name(table_name.clone())
357+
.location(location.clone())
358+
.schema(schema.clone())
359+
.properties(properties.clone())
360+
.build();
361+
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
362+
.build()?
363+
.metadata;
364+
let metadata_location =
365+
MetadataLocation::new_with_metadata(location.clone(), &metadata).to_string();
366+
357367
let result = convert_to_hive_table(
358368
db_name.clone(),
359369
&schema,

crates/catalog/s3tables/src/catalog.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
use std::collections::HashMap;
1919
use std::future::Future;
20+
use std::str::FromStr;
2021
use std::sync::Arc;
2122

2223
use async_trait::async_trait;
@@ -501,25 +502,25 @@ impl Catalog for S3TablesCatalog {
501502
let metadata = TableMetadataBuilder::from_table_creation(creation)?
502503
.build()?
503504
.metadata;
504-
let metadata_location =
505-
MetadataLocation::new_with_table_location(table_location).to_string();
505+
let metadata_location = MetadataLocation::new_with_metadata(table_location, &metadata);
506506
metadata.write_to(&self.file_io, &metadata_location).await?;
507507

508508
// update metadata location
509+
let metadata_location_str = metadata_location.to_string();
509510
self.s3tables_client
510511
.update_table_metadata_location()
511512
.table_bucket_arn(self.config.table_bucket_arn.clone())
512513
.namespace(namespace.to_url_string())
513514
.name(table_ident.name())
514-
.metadata_location(metadata_location.clone())
515+
.metadata_location(metadata_location_str.clone())
515516
.version_token(create_resp.version_token())
516517
.send()
517518
.await
518519
.map_err(from_aws_sdk_error)?;
519520

520521
let table = Table::builder()
521522
.identifier(table_ident)
522-
.metadata_location(metadata_location)
523+
.metadata_location(metadata_location_str)
523524
.metadata(metadata)
524525
.file_io(self.file_io.clone())
525526
.build()?;
@@ -630,11 +631,12 @@ impl Catalog for S3TablesCatalog {
630631
self.load_table_with_version_token(&table_ident).await?;
631632

632633
let staged_table = commit.apply(current_table)?;
633-
let staged_metadata_location = staged_table.metadata_location_result()?;
634+
let staged_metadata_location_str = staged_table.metadata_location_result()?;
635+
let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
634636

635637
staged_table
636638
.metadata()
637-
.write_to(staged_table.file_io(), staged_metadata_location)
639+
.write_to(staged_table.file_io(), &staged_metadata_location)
638640
.await?;
639641

640642
let builder = self
@@ -644,7 +646,7 @@ impl Catalog for S3TablesCatalog {
644646
.namespace(table_namespace.to_url_string())
645647
.name(table_ident.name())
646648
.version_token(version_token)
647-
.metadata_location(staged_metadata_location);
649+
.metadata_location(staged_metadata_location_str);
648650

649651
let _ = builder.send().await.map_err(|e| {
650652
let error = e.into_service_error();

crates/catalog/sql/src/catalog.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -851,21 +851,22 @@ impl Catalog for SqlCatalog {
851851
.build()?
852852
.metadata;
853853
let tbl_metadata_location =
854-
MetadataLocation::new_with_table_location(location.clone()).to_string();
854+
MetadataLocation::new_with_metadata(location.clone(), &tbl_metadata);
855855

856856
tbl_metadata
857857
.write_to(&self.fileio, &tbl_metadata_location)
858858
.await?;
859859

860+
let tbl_metadata_location_str = tbl_metadata_location.to_string();
860861
self.execute(&format!(
861862
"INSERT INTO {CATALOG_TABLE_NAME}
862863
({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE})
863864
VALUES (?, ?, ?, ?, ?)
864-
"), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
865+
"), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location_str), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?;
865866

866867
Ok(Table::builder()
867868
.file_io(self.fileio.clone())
868-
.metadata_location(tbl_metadata_location)
869+
.metadata_location(tbl_metadata_location_str)
869870
.identifier(tbl_ident)
870871
.metadata(tbl_metadata)
871872
.build()?)
@@ -949,13 +950,15 @@ impl Catalog for SqlCatalog {
949950
let current_metadata_location = current_table.metadata_location_result()?.to_string();
950951

951952
let staged_table = commit.apply(current_table)?;
952-
let staged_metadata_location = staged_table.metadata_location_result()?;
953+
let staged_metadata_location_str = staged_table.metadata_location_result()?;
954+
let staged_metadata_location = MetadataLocation::from_str(staged_metadata_location_str)?;
953955

954956
staged_table
955957
.metadata()
956958
.write_to(staged_table.file_io(), &staged_metadata_location)
957959
.await?;
958960

961+
let staged_metadata_location_str = staged_metadata_location.to_string();
959962
let update_result = self
960963
.execute(
961964
&format!(
@@ -971,7 +974,7 @@ impl Catalog for SqlCatalog {
971974
AND {CATALOG_FIELD_METADATA_LOCATION_PROP} = ?"
972975
),
973976
vec![
974-
Some(staged_metadata_location),
977+
Some(&staged_metadata_location_str),
975978
Some(current_metadata_location.as_str()),
976979
Some(&self.name),
977980
Some(table_ident.name()),

crates/iceberg/src/catalog/memory/catalog.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
//! This module contains memory catalog implementation.
1919
2020
use std::collections::HashMap;
21+
use std::str::FromStr;
2122
use std::sync::Arc;
2223

2324
use async_trait::async_trait;
@@ -295,15 +296,15 @@ impl Catalog for MemoryCatalog {
295296
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
296297
.build()?
297298
.metadata;
298-
let metadata_location = MetadataLocation::new_with_table_location(location).to_string();
299+
let metadata_location = MetadataLocation::new_with_metadata(location, &metadata);
299300

300301
metadata.write_to(&self.file_io, &metadata_location).await?;
301302

302-
root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;
303+
root_namespace_state.insert_new_table(&table_ident, metadata_location.to_string())?;
303304

304305
Table::builder()
305306
.file_io(self.file_io.clone())
306-
.metadata_location(metadata_location)
307+
.metadata_location(metadata_location.to_string())
307308
.metadata(metadata)
308309
.identifier(table_ident)
309310
.build()
@@ -381,12 +382,11 @@ impl Catalog for MemoryCatalog {
381382
let staged_table = commit.apply(current_table)?;
382383

383384
// Write table metadata to the new location
385+
let metadata_location =
386+
MetadataLocation::from_str(staged_table.metadata_location_result()?)?;
384387
staged_table
385388
.metadata()
386-
.write_to(
387-
staged_table.file_io(),
388-
staged_table.metadata_location_result()?,
389-
)
389+
.write_to(staged_table.file_io(), &metadata_location)
390390
.await?;
391391

392392
// Flip the pointer to reference the new metadata file.

0 commit comments

Comments
 (0)