From 87260d09828385ff5c0ba7e03cddd559b2a65c43 Mon Sep 17 00:00:00 2001 From: Harrison Crosse Date: Mon, 30 Mar 2026 14:12:06 -0400 Subject: [PATCH 1/2] feat: make root schema repetition configurable via table properties Add write.parquet.root-repetition property (required/optional/repeated, default: required) to control the Parquet root schema element's repetition type. arrow-go defaults to Repeated, which Snowflake interprets as one-level list encoding and rejects files with list columns. Defaulting to Required aligns with the Parquet spec and matches arrow-rs, pyarrow, and parquet-java behavior. --- table/internal/parquet_files.go | 19 ++++++++++++++++++- table/properties.go | 2 ++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/table/internal/parquet_files.go b/table/internal/parquet_files.go index 040344a15..123d5929c 100644 --- a/table/internal/parquet_files.go +++ b/table/internal/parquet_files.go @@ -65,6 +65,8 @@ const ( ParquetBloomFilterMaxBytesKey = "write.parquet.bloom-filter-max-bytes" ParquetBloomFilterMaxBytesDefault = 1024 * 1024 ParquetBloomFilterColumnEnabledKeyPrefix = "write.parquet.bloom-filter-enabled.column" + ParquetRootRepetitionKey = "write.parquet.root-repetition" + ParquetRootRepetitionDefault = "required" ) type parquetFormat struct{} @@ -256,8 +258,23 @@ func (parquetFormat) GetWriteProperties(props iceberg.Properties) any { slog.Warn("unrecognized compression codec, falling back to uncompressed", "codec", compression) } + var rootRepetition parquet.Repetition + switch props.Get(ParquetRootRepetitionKey, ParquetRootRepetitionDefault) { + case "required": + rootRepetition = parquet.Repetitions.Required + case "optional": + rootRepetition = parquet.Repetitions.Optional + case "repeated": + rootRepetition = parquet.Repetitions.Repeated + default: + slog.Warn("unrecognized root repetition, falling back to required", + "repetition", props.Get(ParquetRootRepetitionKey, ParquetRootRepetitionDefault)) + rootRepetition = parquet.Repetitions.Required + } + return append(writerProps, parquet.WithCompression(codec), - parquet.WithCompressionLevel(compressionLevel)) + parquet.WithCompressionLevel(compressionLevel), + parquet.WithRootRepetition(rootRepetition)) } func (p parquetFormat) WriteDataFile(ctx context.Context, fs iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches []arrow.RecordBatch) (iceberg.DataFile, error) { diff --git a/table/properties.go b/table/properties.go index ae9ed0934..dafebdfb5 100644 --- a/table/properties.go +++ b/table/properties.go @@ -56,6 +56,8 @@ const ( ParquetBloomFilterMaxBytesKey = internal.ParquetBloomFilterMaxBytesKey ParquetBloomFilterMaxBytesDefault = internal.ParquetBloomFilterMaxBytesDefault ParquetBloomFilterColumnEnabledKeyPrefix = internal.ParquetBloomFilterColumnEnabledKeyPrefix + ParquetRootRepetitionKey = internal.ParquetRootRepetitionKey + ParquetRootRepetitionDefault = internal.ParquetRootRepetitionDefault ManifestMergeEnabledKey = "commit.manifest-merge.enabled" ManifestMergeEnabledDefault = false From 21d1ae4450bda9789bef4a71a80a847302283038 Mon Sep 17 00:00:00 2001 From: Harrison Crosse Date: Mon, 30 Mar 2026 14:13:46 -0400 Subject: [PATCH 2/2] fix: goroutine leak in positionDeleteRecordsToDataFiles partitioned path iter.Pull(args.counter) was called unconditionally, but in the partitioned path newWriterFactory creates its own iter.Pull and the original stopCount was never called, leaking one goroutine per write. Move iter.Pull into the unpartitioned branch where it is actually used. Add a regression test confirming goroutine count stays stable. --- table/arrow_utils.go | 2 +- ...s_delete_partitioned_fanout_writer_test.go | 72 +++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/table/arrow_utils.go b/table/arrow_utils.go index fd1d3cb62..a3472d665 100644 --- a/table/arrow_utils.go +++ b/table/arrow_utils.go @@ -1601,8 +1601,8 @@ func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation string, cw := newConcurrentDataFileWriter(func(rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) { return newPositionDeleteWriter(rootLocation, fs, meta, props, opts...) }, withSchemaSanitization(false)) - nextCount, stopCount := iter.Pull(args.counter) if latestMetadata.PartitionSpec().IsUnpartitioned() { + nextCount, stopCount := iter.Pull(args.counter) tasks := func(yield func(WriteTask) bool) { defer stopCount() diff --git a/table/pos_delete_partitioned_fanout_writer_test.go b/table/pos_delete_partitioned_fanout_writer_test.go index c0a75dab8..ff4143304 100644 --- a/table/pos_delete_partitioned_fanout_writer_test.go +++ b/table/pos_delete_partitioned_fanout_writer_test.go @@ -22,6 +22,7 @@ import ( "errors" "fmt" "maps" + "runtime" "slices" "strings" "testing" @@ -245,6 +246,77 @@ func TestPositionDeletePartitionedFanoutWriterPartitionPathIsDeterministic(t *te require.Contains(t, seen, expectedPath) } +func TestPositionDeletePartitionedNoGoroutineLeak(t *testing.T) { + t.Parallel() + + partitionSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{ + SourceID: 2, + Name: "age_bucket", + Transform: iceberg.BucketTransform{ + NumBuckets: 2, + }, + }) + + metadataBuilder, err := NewMetadataBuilder(2) + require.NoError(t, err) + err = metadataBuilder.AddSchema(iceberg.NewSchema(0, append(iceberg.PositionalDeleteSchema.Fields(), iceberg.NestedField{Name: "age", ID: 2, Type: iceberg.Int64Type{}})...)) + require.NoError(t, err) + err = metadataBuilder.SetCurrentSchemaID(0) + require.NoError(t, err) + err = metadataBuilder.AddPartitionSpec(&partitionSpec, true) + require.NoError(t, err) + err = metadataBuilder.SetDefaultSpecID(0) + require.NoError(t, err) + sortOrder, err := NewSortOrder(1, []SortField{{ + SourceID: 2, + Direction: SortASC, + Transform: iceberg.IdentityTransform{}, + NullOrder: NullsFirst, + }}) + require.NoError(t, err) + err = metadataBuilder.AddSortOrder(&sortOrder) + require.NoError(t, err) + err = metadataBuilder.SetDefaultSortOrderID(1) + require.NoError(t, err) + + tmpDir := t.TempDir() + + // Allow goroutines from prior tests to settle. + runtime.GC() + time.Sleep(50 * time.Millisecond) + before := runtime.NumGoroutine() + + iterations := 50 + for range iterations { + writeUUID := uuid.New() + emptyItr := func(yield func(arrow.RecordBatch, error) bool) {} + + itr := positionDeleteRecordsToDataFiles(t.Context(), tmpDir, metadataBuilder, + map[string]partitionContext{}, recordWritingArgs{ + sc: PositionalDeleteArrowSchema, + itr: emptyItr, + fs: &io.LocalFS{}, + writeUUID: &writeUUID, + counter: internal.Counter(0), + }) + + for range itr { + } + } + + // Allow leaked goroutines to appear. + runtime.GC() + time.Sleep(50 * time.Millisecond) + after := runtime.NumGoroutine() + + // Before the fix, each iteration leaked a goroutine from iter.Pull(args.counter) + // being called unconditionally but stopCount never invoked in the partitioned path. + // Allow a small margin for background runtime goroutines. + assert.LessOrEqual(t, after, before+5, + "expected no goroutine growth after %d iterations, got %d -> %d (delta: %d)", + iterations, before, after, after-before) +} + func onlyContext(ctx context.Context, _ func()) context.Context { return ctx }