Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion table/arrow_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1601,8 +1601,8 @@ func positionDeleteRecordsToDataFiles(ctx context.Context, rootLocation string,
cw := newConcurrentDataFileWriter(func(rootLocation string, fs iceio.WriteFileIO, meta *MetadataBuilder, props iceberg.Properties, opts ...dataFileWriterOption) (dataFileWriter, error) {
return newPositionDeleteWriter(rootLocation, fs, meta, props, opts...)
}, withSchemaSanitization(false))
nextCount, stopCount := iter.Pull(args.counter)
if latestMetadata.PartitionSpec().IsUnpartitioned() {
nextCount, stopCount := iter.Pull(args.counter)
tasks := func(yield func(WriteTask) bool) {
defer stopCount()

Expand Down
19 changes: 18 additions & 1 deletion table/internal/parquet_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ const (
ParquetBloomFilterMaxBytesKey = "write.parquet.bloom-filter-max-bytes"
ParquetBloomFilterMaxBytesDefault = 1024 * 1024
ParquetBloomFilterColumnEnabledKeyPrefix = "write.parquet.bloom-filter-enabled.column"
ParquetRootRepetitionKey = "write.parquet.root-repetition"
ParquetRootRepetitionDefault = "required"
)

type parquetFormat struct{}
Expand Down Expand Up @@ -256,8 +258,23 @@ func (parquetFormat) GetWriteProperties(props iceberg.Properties) any {
slog.Warn("unrecognized compression codec, falling back to uncompressed", "codec", compression)
}

var rootRepetition parquet.Repetition
switch props.Get(ParquetRootRepetitionKey, ParquetRootRepetitionDefault) {
case "required":
rootRepetition = parquet.Repetitions.Required
case "optional":
rootRepetition = parquet.Repetitions.Optional
case "repeated":
rootRepetition = parquet.Repetitions.Repeated
default:
slog.Warn("unrecognized root repetition, falling back to required",
"repetition", props.Get(ParquetRootRepetitionKey, ParquetRootRepetitionDefault))
rootRepetition = parquet.Repetitions.Required
}

return append(writerProps, parquet.WithCompression(codec),
parquet.WithCompressionLevel(compressionLevel))
parquet.WithCompressionLevel(compressionLevel),
parquet.WithRootRepetition(rootRepetition))
}

func (p parquetFormat) WriteDataFile(ctx context.Context, fs iceio.WriteFileIO, partitionValues map[int]any, info WriteFileInfo, batches []arrow.RecordBatch) (iceberg.DataFile, error) {
Expand Down
72 changes: 72 additions & 0 deletions table/pos_delete_partitioned_fanout_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"maps"
"runtime"
"slices"
"strings"
"testing"
Expand Down Expand Up @@ -245,6 +246,77 @@ func TestPositionDeletePartitionedFanoutWriterPartitionPathIsDeterministic(t *te
require.Contains(t, seen, expectedPath)
}

func TestPositionDeletePartitionedNoGoroutineLeak(t *testing.T) {
t.Parallel()

partitionSpec := iceberg.NewPartitionSpec(iceberg.PartitionField{
SourceID: 2,
Name: "age_bucket",
Transform: iceberg.BucketTransform{
NumBuckets: 2,
},
})

metadataBuilder, err := NewMetadataBuilder(2)
require.NoError(t, err)
err = metadataBuilder.AddSchema(iceberg.NewSchema(0, append(iceberg.PositionalDeleteSchema.Fields(), iceberg.NestedField{Name: "age", ID: 2, Type: iceberg.Int64Type{}})...))
require.NoError(t, err)
err = metadataBuilder.SetCurrentSchemaID(0)
require.NoError(t, err)
err = metadataBuilder.AddPartitionSpec(&partitionSpec, true)
require.NoError(t, err)
err = metadataBuilder.SetDefaultSpecID(0)
require.NoError(t, err)
sortOrder, err := NewSortOrder(1, []SortField{{
SourceID: 2,
Direction: SortASC,
Transform: iceberg.IdentityTransform{},
NullOrder: NullsFirst,
}})
require.NoError(t, err)
err = metadataBuilder.AddSortOrder(&sortOrder)
require.NoError(t, err)
err = metadataBuilder.SetDefaultSortOrderID(1)
require.NoError(t, err)

tmpDir := t.TempDir()

// Allow goroutines from prior tests to settle.
runtime.GC()
time.Sleep(50 * time.Millisecond)
before := runtime.NumGoroutine()

iterations := 50
for range iterations {
writeUUID := uuid.New()
emptyItr := func(yield func(arrow.RecordBatch, error) bool) {}

itr := positionDeleteRecordsToDataFiles(t.Context(), tmpDir, metadataBuilder,
map[string]partitionContext{}, recordWritingArgs{
sc: PositionalDeleteArrowSchema,
itr: emptyItr,
fs: &io.LocalFS{},
writeUUID: &writeUUID,
counter: internal.Counter(0),
})

for range itr {
}
}

// Allow leaked goroutines to appear.
runtime.GC()
time.Sleep(50 * time.Millisecond)
after := runtime.NumGoroutine()

// Before the fix, each iteration leaked a goroutine from iter.Pull(args.counter)
// being called unconditionally but stopCount never invoked in the partitioned path.
// Allow a small margin for background runtime goroutines.
assert.LessOrEqual(t, after, before+5,
"expected no goroutine growth after %d iterations, got %d -> %d (delta: %d)",
iterations, before, after, after-before)
}

func onlyContext(ctx context.Context, _ func()) context.Context {
return ctx
}
Expand Down
2 changes: 2 additions & 0 deletions table/properties.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const (
ParquetBloomFilterMaxBytesKey = internal.ParquetBloomFilterMaxBytesKey
ParquetBloomFilterMaxBytesDefault = internal.ParquetBloomFilterMaxBytesDefault
ParquetBloomFilterColumnEnabledKeyPrefix = internal.ParquetBloomFilterColumnEnabledKeyPrefix
ParquetRootRepetitionKey = internal.ParquetRootRepetitionKey
ParquetRootRepetitionDefault = internal.ParquetRootRepetitionDefault

ManifestMergeEnabledKey = "commit.manifest-merge.enabled"
ManifestMergeEnabledDefault = false
Expand Down