diff --git a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java index 3cb46b309d82..0cc391311f48 100644 --- a/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java +++ b/data/src/test/java/org/apache/iceberg/data/TestMetricsRowGroupFilter.java @@ -48,6 +48,7 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.iceberg.FileFormat; diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java index 16ca29ff4053..09626b97c319 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReader.java @@ -183,6 +183,11 @@ public VortexValueReader primitive(Type.PrimitiveType iPrimitive, Field primF return simpleReader(arrowType); } + @Override + public VortexValueReader variant(Types.VariantType variantType, Field variantField) { + return GenericVortexReaders.variants(); + } + private static VortexValueReader simpleReader(ArrowType arrowType) { if (arrowType instanceof ArrowType.Bool) { return GenericVortexReaders.bools(); diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java index 26df6752eec7..92989eccf429 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexReaders.java @@ -20,6 +20,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.time.Instant; import java.time.LocalDate; @@ -53,6 +54,9 @@ import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantValue; import org.apache.iceberg.vortex.VortexValueReader; public class GenericVortexReaders { @@ -94,6 +98,10 @@ public static VortexValueReader uuids() { return UuidReader.INSTANCE; } + public static VortexValueReader variants() { + return VariantReader.INSTANCE; + } + public static VortexValueReader date(boolean isMillis) { return new DateReader(isMillis); } @@ -314,6 +322,77 @@ static FixedSizeBinaryVector uuidStorage(FieldVector vector) { return (FixedSizeBinaryVector) vector; } + private static class VariantReader implements VortexValueReader { + static final VariantReader INSTANCE = new VariantReader(); + + private VariantReader() {} + + @Override + public Variant read(FieldVector vector, int row) { + StructVector storage = variantStorage(vector); + VarBinaryVector valueVector = storage.getChild("value", VarBinaryVector.class); + if (vector.isNull(row) || isMissingBinary(valueVector, row)) { + FieldVector typedValueVector = (FieldVector) storage.getChild("typed_value"); + if (typedValueVector != null && !typedValueVector.isNull(row)) { + throw new UnsupportedOperationException( + "Reading shredded Variant values from Vortex is not supported yet"); + } + + return null; + } + + return readVariant(storage, valueVector, row); + } + + @Override + public Variant readNonNull(FieldVector vector, int row) { + StructVector storage = variantStorage(vector); + VarBinaryVector valueVector = storage.getChild("value", VarBinaryVector.class); + if (isMissingBinary(valueVector, row)) { + throw new UnsupportedOperationException( + "Reading shredded Variant values from Vortex is not supported yet"); + } + + return readVariant(storage, valueVector, row); + } + + private Variant readVariant(StructVector storage, VarBinaryVector valueVector, int row) { + VarBinaryVector metadataVector = storage.getChild("metadata", VarBinaryVector.class); + + if (metadataVector == null || metadataVector.isNull(row)) { + throw new IllegalStateException("Invalid Vortex variant: metadata is null"); + } + + byte[] metadataBytes = metadataVector.get(row); + byte[] valueBytes = valueVector.get(row); + if (metadataBytes.length == 0 || valueBytes.length == 0) { + throw new IllegalStateException( + "Invalid Vortex variant: serialized value is empty (metadata=" + + metadataBytes.length + + ", value=" + + valueBytes.length + + ")"); + } + + VariantMetadata metadata = + VariantMetadata.from(ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN)); + VariantValue value = + VariantValue.from(metadata, ByteBuffer.wrap(valueBytes).order(ByteOrder.LITTLE_ENDIAN)); + return Variant.of(metadata, value); + } + } + + private static boolean isMissingBinary(VarBinaryVector vector, int row) { + return vector == null || vector.isNull(row) || vector.get(row).length == 0; + } + + private static StructVector variantStorage(FieldVector vector) { + if (vector instanceof ExtensionTypeVector ext) { + return (StructVector) ext.getUnderlyingVector(); + } + return (StructVector) vector; + } + private static class DateReader implements VortexValueReader { private final boolean isMillis; diff --git a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java index 6d9346b960a5..6cb27deeb0de 100644 --- a/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java +++ b/vortex/src/main/java/org/apache/iceberg/data/vortex/GenericVortexWriter.java @@ -20,6 +20,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.time.LocalDateTime; @@ -54,9 +55,14 @@ import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.Serialized; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantValue; import org.apache.iceberg.vortex.VortexValueWriter; /** Writes Iceberg generic {@link Record} objects to Arrow vectors for Vortex file output. */ @@ -90,7 +96,12 @@ public void write(Record datum, VectorSchemaRoot root, int rowIndex) { ColumnMetricsTracker tracker = (ColumnMetricsTracker) trackers[fieldIndex]; if (value == null) { - vector.setNull(rowIndex); + if (field.isRequired()) { + throw new IllegalArgumentException( + "Cannot write null value for required field: " + field); + } + + writeNull(vector, field.type(), rowIndex); tracker.addNull(); continue; } @@ -225,6 +236,10 @@ private static void writeValue( } // Mark the struct slot itself as non-null for this row. structVector.setIndexDefined(rowIndex); + break; + case VARIANT: + writeVariant((StructVector) vector, (Variant) value, rowIndex); + break; default: throw new UnsupportedOperationException( @@ -232,6 +247,60 @@ private static void writeValue( } } + private static void writeNull(FieldVector vector, Type type, int rowIndex) { + if (type.isVariantType()) { + writeNullVariant((StructVector) vector, rowIndex); + } else { + vector.setNull(rowIndex); + } + } + + private static void writeNullVariant(StructVector vector, int rowIndex) { + vector.setNull(rowIndex); + writeVariantMetadata( + vector.getChild("metadata", VarBinaryVector.class), VariantMetadata.empty(), rowIndex); + + VarBinaryVector valueVector = vector.getChild("value", VarBinaryVector.class); + if (valueVector != null) { + valueVector.setNull(rowIndex); + } + } + + private static void writeVariant(StructVector vector, Variant variant, int rowIndex) { + vector.setIndexDefined(rowIndex); + + writeVariantMetadata( + vector.getChild("metadata", VarBinaryVector.class), variant.metadata(), rowIndex); + writeVariantValue(vector.getChild("value", VarBinaryVector.class), variant.value(), rowIndex); + } + + private static void writeVariantMetadata( + VarBinaryVector vector, VariantMetadata metadata, int rowIndex) { + if (metadata instanceof Serialized serialized) { + writeSerialized(vector, serialized, rowIndex); + return; + } + + ByteBuffer buffer = ByteBuffer.allocate(metadata.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + int length = metadata.writeTo(buffer, 0); + vector.setSafe(rowIndex, buffer, 0, length); + } + + private static void writeVariantValue(VarBinaryVector vector, VariantValue value, int rowIndex) { + if (value instanceof Serialized serialized) { + writeSerialized(vector, serialized, rowIndex); + return; + } + + ByteBuffer buffer = ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN); + int length = value.writeTo(buffer, 0); + vector.setSafe(rowIndex, buffer, 0, length); + } + + private static void writeSerialized(VarBinaryVector vector, Serialized serialized, int rowIndex) { + vector.setSafe(rowIndex, ByteBuffers.toByteArray(serialized.buffer())); + } + @SuppressWarnings({"unchecked", "rawtypes"}) private static ColumnMetricsTracker newTracker(Types.NestedField field) { switch (field.type().typeId()) { @@ -274,7 +343,7 @@ private static ColumnMetricsTracker newTracker(Types.NestedField field) { v -> ChronoUnit.NANOS.between(LOCAL_EPOCH, (LocalDateTime) v)); } default: - if (field.type().isNestedType()) { + if (field.type().isNestedType() || field.type().isVariantType()) { // Lists, maps, and structs have no natural ordering — track counts only. return new ColumnMetricsTracker<>(field.fieldId(), null); } @@ -296,6 +365,10 @@ static class ColumnMetricsTracker { private T min; private T max; + ColumnMetricsTracker(int fieldId) { + this(fieldId, null, null); + } + ColumnMetricsTracker(int fieldId, Comparator comparator) { this(fieldId, comparator, null); } diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexMetrics.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexMetrics.java index e098e56377eb..3a4eb7eb0a4c 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexMetrics.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexMetrics.java @@ -90,6 +90,8 @@ static Metrics buildMetrics( } }); + addVariantValueCounts(rowCount, schema, metricsConfig, valueCounts); + return new Metrics( rowCount, null, // columnSizes not available without Vortex JNI support @@ -101,6 +103,19 @@ static Metrics buildMetrics( originalTypes.isEmpty() ? null : originalTypes); } + private static void addVariantValueCounts( + long rowCount, Schema schema, MetricsConfig metricsConfig, Map valueCounts) { + for (Types.NestedField column : schema.columns()) { + int id = column.fieldId(); + MetricsModes.MetricsMode mode = MetricsUtil.metricsMode(schema, metricsConfig, id); + if (column.type().isVariantType() + && mode != MetricsModes.None.get() + && !valueCounts.containsKey(id)) { + valueCounts.put(id, rowCount); + } + } + } + private static int truncateLength(MetricsModes.MetricsMode mode) { if (mode instanceof MetricsModes.Truncate truncate) { return truncate.length(); diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java index 590557e4e8c7..435daab5c657 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemaWithTypeVisitor.java @@ -40,6 +40,8 @@ public abstract class VortexSchemaWithTypeVisitor { public abstract T primitive(Type.PrimitiveType iPrimitive, Field primField); + public abstract T variant(Types.VariantType variantType, Field variantField); + public static T visit( Schema expectedSchema, org.apache.arrow.vector.types.pojo.Schema fileSchema, @@ -48,6 +50,10 @@ public static T visit( } public static T visit(Type iType, Field field, VortexSchemaWithTypeVisitor visitor) { + if ((iType != null && iType.isVariantType()) || VortexSchemas.isVariantField(field)) { + return visitor.variant(iType != null ? iType.asVariantType() : null, field); + } + ArrowType arrowType = field.getType(); if (arrowType instanceof ArrowType.Struct) { return visitStruct(iType != null ? iType.asStructType() : null, field.getChildren(), visitor); diff --git a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java index cab7ca64f792..8db5125e4951 100644 --- a/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java +++ b/vortex/src/main/java/org/apache/iceberg/vortex/VortexSchemas.java @@ -31,6 +31,7 @@ import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -41,6 +42,12 @@ public final class VortexSchemas { /** Canonical Arrow extension name for UUIDs (matches {@code arrow.vector.extension.UuidType}). */ static final String UUID_EXTENSION_NAME = "arrow.uuid"; + /** + * Canonical Arrow extension name for Parquet variant (matches {@code + * arrow.vector.extension.ParquetVariant}). + */ + static final String VARIANT_EXTENSION_NAME = "arrow.parquet.variant"; + private VortexSchemas() {} /** Convert a Vortex file's Arrow {@link org.apache.arrow.vector.types.pojo.Schema} to Iceberg. */ @@ -229,6 +236,25 @@ yield new Field( yield new Field( name, new FieldType(nullable, ArrowType.Struct.INSTANCE, null), children.build()); } + case VARIANT -> { + Map extMetadata = + ImmutableMap.of( + ArrowType.ExtensionType.EXTENSION_METADATA_KEY_NAME, + VARIANT_EXTENSION_NAME, + ArrowType.ExtensionType.EXTENSION_METADATA_KEY_METADATA, + ""); + + ImmutableList.Builder children = ImmutableList.builder(); + children.add( + new Field("metadata", new FieldType(false, ArrowType.Binary.INSTANCE, null), null)); + children.add( + new Field("value", new FieldType(true, ArrowType.Binary.INSTANCE, null), null)); + + yield new Field( + name, + new FieldType(nullable, ArrowType.Struct.INSTANCE, null, extMetadata), + children.build()); + } default -> throw new UnsupportedOperationException( "Unsupported Iceberg type for Arrow conversion: " + type); @@ -459,6 +485,34 @@ yield toVortexArrowField( null, children.build()); } + case VARIANT -> { + Map extMetadata = + ImmutableMap.of( + ArrowType.ExtensionType.EXTENSION_METADATA_KEY_NAME, + VARIANT_EXTENSION_NAME, + ArrowType.ExtensionType.EXTENSION_METADATA_KEY_METADATA, + ""); + + ImmutableList.Builder + children = ImmutableList.builder(); + children.add( + toVortexArrowField( + "metadata", + new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Binary(), + false)); + children.add( + toVortexArrowField( + "value", + new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Binary(), + true)); + + yield toVortexArrowField( + name, + new dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Struct(), + nullable, + extMetadata, + children.build()); + } default -> throw new UnsupportedOperationException( "Unsupported Iceberg type for Arrow conversion: " + type); @@ -492,6 +546,12 @@ private static Type toIcebergType(Field field, AtomicInteger nextId) { if (isUuidField(field)) { return Types.UUIDType.get(); } + + if (isVariantField(field)) { + validateVariantField(field); + return Types.VariantType.get(); + } + ArrowType arrowType = field.getType(); if (arrowType instanceof ArrowType.Int intType) { return intType.getBitWidth() <= Integer.SIZE ? Types.IntegerType.get() : Types.LongType.get(); @@ -518,6 +578,11 @@ private static Type toIcebergType( if (isUuidField(field)) { return Types.UUIDType.get(); } + + if (isVariantField(field)) { + return Types.VariantType.get(); + } + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType arrowType = field.getType(); if (arrowType instanceof dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.Int intType) { @@ -620,6 +685,69 @@ private static Type toIcebergFloatingPoint( }; } + private static void validateVariantField(Field field) { + Preconditions.checkArgument( + field.getType() instanceof ArrowType.Struct, + "Invalid Arrow variant field %s: expected struct storage type, found %s", + field.getName(), + field.getType()); + + Field metadata = findChild(field, "metadata"); + Preconditions.checkArgument( + metadata != null, + "Invalid Arrow variant field %s: missing metadata child", + field.getName()); + Preconditions.checkArgument( + !metadata.isNullable(), + "Invalid Arrow variant field %s: metadata child must be non-nullable", + field.getName()); + Preconditions.checkArgument( + isBinaryLike(metadata.getType()), + "Invalid Arrow variant field %s: metadata child must be binary, found %s", + field.getName(), + metadata.getType()); + + Field value = findChild(field, "value"); + if (value != null) { + Preconditions.checkArgument( + value.isNullable(), + "Invalid Arrow variant field %s: value child must be nullable", + field.getName()); + Preconditions.checkArgument( + isBinaryLike(value.getType()), + "Invalid Arrow variant field %s: value child must be binary, found %s", + field.getName(), + value.getType()); + } + + Field typedValue = findChild(field, "typed_value"); + if (typedValue != null) { + Preconditions.checkArgument( + typedValue.isNullable(), + "Invalid Arrow variant field %s: typed_value child must be nullable", + field.getName()); + } + + Preconditions.checkArgument( + value != null || typedValue != null, + "Invalid Arrow variant field %s: expected value or typed_value child", + field.getName()); + } + + private static Field findChild(Field field, String name) { + for (Field child : field.getChildren()) { + if (name.equals(child.getName())) { + return child; + } + } + + return null; + } + + private static boolean isBinaryLike(ArrowType arrowType) { + return arrowType instanceof ArrowType.Binary || arrowType instanceof ArrowType.LargeBinary; + } + private static Type toIcebergTimestamp(ArrowType.Timestamp tsType) { boolean isNano = tsType.getUnit() == TimeUnit.NANOSECOND; if (tsType.getTimezone() == null) { @@ -684,4 +812,27 @@ public static boolean isUuidField( dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType .EXTENSION_METADATA_KEY_NAME)); } + + public static boolean isVariantField(Field field) { + if (field.getType() instanceof ArrowType.ExtensionType ext) { + return VARIANT_EXTENSION_NAME.equals(ext.extensionName()); + } + return VARIANT_EXTENSION_NAME.equals( + field.getMetadata().get(ArrowType.ExtensionType.EXTENSION_METADATA_KEY_NAME)); + } + + public static boolean isVariantField( + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field field) { + if (field.getType() + instanceof + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType ext) { + return VARIANT_EXTENSION_NAME.equals(ext.extensionName()); + } + return VARIANT_EXTENSION_NAME.equals( + field + .getMetadata() + .get( + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.ArrowType.ExtensionType + .EXTENSION_METADATA_KEY_NAME)); + } } diff --git a/vortex/src/test/java/org/apache/iceberg/vortex/TestGenericVortex.java b/vortex/src/test/java/org/apache/iceberg/vortex/TestGenericVortex.java index 3722a1b6d0f8..2f6bdbc8ea0a 100644 --- a/vortex/src/test/java/org/apache/iceberg/vortex/TestGenericVortex.java +++ b/vortex/src/test/java/org/apache/iceberg/vortex/TestGenericVortex.java @@ -48,7 +48,12 @@ protected boolean supportsUnknown() { @Override protected boolean supportsVariant() { - return false; + return true; + } + + @Override + protected boolean supportsTimestampNanos() { + return true; } @Override diff --git a/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexMetrics.java b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexMetrics.java index 948dbad34e4e..a9dd0a9526fe 100644 --- a/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexMetrics.java +++ b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexMetrics.java @@ -147,6 +147,24 @@ public void testMetricsCountsMode() { assertThat(metrics.upperBounds()).isNull(); } + @Test + public void testVariantColumnReportsRowCountWithoutBounds() { + Schema variantSchema = + new Schema( + required(1, "id", Types.LongType.get()), + optional(2, "payload", Types.VariantType.get())); + FieldMetrics idMetrics = new FieldMetrics<>(1, 3, 0, 10L, 12L); + + Metrics metrics = + VortexMetrics.buildMetrics( + 3L, variantSchema, MetricsConfig.getDefault(), Stream.of(idMetrics)); + + assertThat(metrics.valueCounts()).containsEntry(2, 3L); + assertThat(metrics.nullValueCounts()).doesNotContainKey(2); + assertThat(metrics.lowerBounds()).doesNotContainKey(2); + assertThat(metrics.upperBounds()).doesNotContainKey(2); + } + @Test public void testMetricsNoneMode() { MetricsConfig noneConfig = diff --git a/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java index 9a1e5877ee76..9fef85b660a7 100644 --- a/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java +++ b/vortex/src/test/java/org/apache/iceberg/vortex/TestVortexSchemas.java @@ -21,8 +21,10 @@ import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.List; +import java.util.Map; import org.apache.arrow.vector.types.FloatingPointPrecision; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; @@ -32,7 +34,14 @@ import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; -public class TestVortexSchemas { +class TestVortexSchemas { + private static final Map VARIANT_METADATA = + Map.of( + ArrowType.ExtensionType.EXTENSION_METADATA_KEY_NAME, + VortexSchemas.VARIANT_EXTENSION_NAME, + ArrowType.ExtensionType.EXTENSION_METADATA_KEY_METADATA, + ""); + // A struct nested inside a struct, plus a list, so the conversion exercises every recursive // branch and id is forced to stay unique across siblings, nested structs, and list elements. private static final Schema SCHEMA = @@ -55,17 +64,17 @@ public class TestVortexSchemas { Types.StructType.of(required(9, "x", Types.IntegerType.get())))))); @Test - public void testConvertLocalArrowStructTypes() { + void convertLocalArrowStructTypes() { assertStructRoundTrip(VortexSchemas.convert(VortexSchemas.toArrowSchema(SCHEMA))); } @Test - public void testConvertRelocatedArrowStructTypes() { + void convertRelocatedArrowStructTypes() { assertStructRoundTrip(VortexSchemas.convert(VortexSchemas.toVortexArrowSchema(SCHEMA))); } @Test - public void testConvertLargeAndFixedSizeLists() { + void convertLargeAndFixedSizeLists() { // toArrowSchema only emits ArrowType.List for Iceberg lists, so build the Arrow schema directly // to exercise the LargeList and FixedSizeList branches a real Vortex file can produce. org.apache.arrow.vector.types.pojo.Schema arrowSchema = @@ -88,11 +97,138 @@ public void testConvertLargeAndFixedSizeLists() { assertThat(TypeUtil.indexById(converted.asStruct())).hasSize(4); } + @Test + void variantToArrowUsesCanonicalUnshreddedStorage() { + Schema icebergSchema = + new Schema( + required(1, "id", Types.LongType.get()), optional(2, "v", Types.VariantType.get())); + + Field variant = VortexSchemas.toArrowSchema(icebergSchema).findField("v"); + + assertThat(VortexSchemas.isVariantField(variant)).isTrue(); + assertThat(variant.isNullable()).isTrue(); + assertThat(variant.getType()).isEqualTo(ArrowType.Struct.INSTANCE); + assertThat(variant.getChildren()).hasSize(2); + + Field metadata = variant.getChildren().get(0); + assertThat(metadata.getName()).isEqualTo("metadata"); + assertThat(metadata.isNullable()).isFalse(); + assertThat(metadata.getType()).isEqualTo(ArrowType.Binary.INSTANCE); + + Field value = variant.getChildren().get(1); + assertThat(value.getName()).isEqualTo("value"); + assertThat(value.isNullable()).isTrue(); + assertThat(value.getType()).isEqualTo(ArrowType.Binary.INSTANCE); + } + + @Test + void requiredVariantToVortexArrowKeepsValueChildNullable() { + Schema icebergSchema = + new Schema( + required(1, "id", Types.LongType.get()), required(2, "v", Types.VariantType.get())); + + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field variant = + VortexSchemas.toVortexArrowSchema(icebergSchema).findField("v"); + dev.vortex.relocated.org.apache.arrow.vector.types.pojo.Field value = + variant.getChildren().get(1); + + assertThat(variant.isNullable()).isFalse(); + assertThat(value.getName()).isEqualTo("value"); + assertThat(value.isNullable()).isTrue(); + } + + @Test + void variantFromArrowAcceptsTypedValueOnlyStorage() { + Field variant = + variantField( + "v", + true, + List.of( + binaryField("metadata", false), + new Field( + "typed_value", + new FieldType(true, new ArrowType.Int(Integer.SIZE, true), null), + null))); + + Schema converted = + VortexSchemas.convert(new org.apache.arrow.vector.types.pojo.Schema(List.of(variant))); + + assertThat(converted.columns()).containsExactly(optional(0, "v", Types.VariantType.get())); + } + + @Test + void variantFromArrowRequiresMetadataChild() { + Field variant = variantField("v", true, List.of(binaryField("value", true))); + + assertThatThrownBy( + () -> + VortexSchemas.convert( + new org.apache.arrow.vector.types.pojo.Schema(List.of(variant)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("metadata"); + } + + @Test + void variantFromArrowRequiresValueOrTypedValueChild() { + Field variant = variantField("v", true, List.of(binaryField("metadata", false))); + + assertThatThrownBy( + () -> + VortexSchemas.convert( + new org.apache.arrow.vector.types.pojo.Schema(List.of(variant)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("value or typed_value"); + } + + @Test + void variantFromArrowRequiresNullableValueChild() { + Field variant = + variantField( + "v", true, List.of(binaryField("metadata", false), binaryField("value", false))); + + assertThatThrownBy( + () -> + VortexSchemas.convert( + new org.apache.arrow.vector.types.pojo.Schema(List.of(variant)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("value child must be nullable"); + } + + @Test + void variantFromArrowRequiresNullableTypedValueChild() { + Field variant = + variantField( + "v", + true, + List.of( + binaryField("metadata", false), + new Field( + "typed_value", + new FieldType(false, new ArrowType.Int(Integer.SIZE, true), null), + null))); + + assertThatThrownBy( + () -> + VortexSchemas.convert( + new org.apache.arrow.vector.types.pojo.Schema(List.of(variant)))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("typed_value child must be nullable"); + } + private static Field listField(String name, ArrowType listType, ArrowType elementType) { Field element = new Field("element", new FieldType(false, elementType, null), null); return new Field(name, new FieldType(true, listType, null), List.of(element)); } + private static Field variantField(String name, boolean nullable, List children) { + return new Field( + name, new FieldType(nullable, ArrowType.Struct.INSTANCE, null, VARIANT_METADATA), children); + } + + private static Field binaryField(String name, boolean nullable) { + return new Field(name, new FieldType(nullable, ArrowType.Binary.INSTANCE, null), null); + } + private static void assertStructRoundTrip(Schema roundTrip) { // Names and types survive the round trip through Arrow (binding is by name). assertThat(roundTrip.findField("location").type()).isInstanceOf(Types.StructType.class);