Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.UUID;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.FileFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ public VortexValueReader<?> primitive(Type.PrimitiveType iPrimitive, Field primF
return simpleReader(arrowType);
}

@Override
public VortexValueReader<?> variant(Types.VariantType variantType, Field variantField) {
return GenericVortexReaders.variants();
}

private static VortexValueReader<?> simpleReader(ArrowType arrowType) {
if (arrowType instanceof ArrowType.Bool) {
return GenericVortexReaders.bools();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.LocalDate;
Expand Down Expand Up @@ -53,6 +54,9 @@
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantValue;
import org.apache.iceberg.vortex.VortexValueReader;

public class GenericVortexReaders {
Expand Down Expand Up @@ -94,6 +98,10 @@ public static VortexValueReader<UUID> uuids() {
return UuidReader.INSTANCE;
}

public static VortexValueReader<Variant> variants() {
return VariantReader.INSTANCE;
}

public static VortexValueReader<LocalDate> date(boolean isMillis) {
return new DateReader(isMillis);
}
Expand Down Expand Up @@ -314,6 +322,77 @@ static FixedSizeBinaryVector uuidStorage(FieldVector vector) {
return (FixedSizeBinaryVector) vector;
}

private static class VariantReader implements VortexValueReader<Variant> {
static final VariantReader INSTANCE = new VariantReader();

private VariantReader() {}

@Override
public Variant read(FieldVector vector, int row) {
StructVector storage = variantStorage(vector);
VarBinaryVector valueVector = storage.getChild("value", VarBinaryVector.class);
if (vector.isNull(row) || isMissingBinary(valueVector, row)) {
FieldVector typedValueVector = (FieldVector) storage.getChild("typed_value");
if (typedValueVector != null && !typedValueVector.isNull(row)) {
throw new UnsupportedOperationException(
"Reading shredded Variant values from Vortex is not supported yet");
}

return null;
}

return readVariant(storage, valueVector, row);
}

@Override
public Variant readNonNull(FieldVector vector, int row) {
StructVector storage = variantStorage(vector);
VarBinaryVector valueVector = storage.getChild("value", VarBinaryVector.class);
if (isMissingBinary(valueVector, row)) {
throw new UnsupportedOperationException(
"Reading shredded Variant values from Vortex is not supported yet");
}

return readVariant(storage, valueVector, row);
}

private Variant readVariant(StructVector storage, VarBinaryVector valueVector, int row) {
VarBinaryVector metadataVector = storage.getChild("metadata", VarBinaryVector.class);

if (metadataVector == null || metadataVector.isNull(row)) {
throw new IllegalStateException("Invalid Vortex variant: metadata is null");
}

byte[] metadataBytes = metadataVector.get(row);
byte[] valueBytes = valueVector.get(row);
if (metadataBytes.length == 0 || valueBytes.length == 0) {
throw new IllegalStateException(
"Invalid Vortex variant: serialized value is empty (metadata="
+ metadataBytes.length
+ ", value="
+ valueBytes.length
+ ")");
}

VariantMetadata metadata =
VariantMetadata.from(ByteBuffer.wrap(metadataBytes).order(ByteOrder.LITTLE_ENDIAN));
VariantValue value =
VariantValue.from(metadata, ByteBuffer.wrap(valueBytes).order(ByteOrder.LITTLE_ENDIAN));
return Variant.of(metadata, value);
}
}

private static boolean isMissingBinary(VarBinaryVector vector, int row) {
return vector == null || vector.isNull(row) || vector.get(row).length == 0;
}

private static StructVector variantStorage(FieldVector vector) {
if (vector instanceof ExtensionTypeVector<?> ext) {
return (StructVector) ext.getUnderlyingVector();
}
return (StructVector) vector;
}

private static class DateReader implements VortexValueReader<LocalDate> {
private final boolean isMillis;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -54,9 +55,14 @@
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.iceberg.variants.Serialized;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantValue;
import org.apache.iceberg.vortex.VortexValueWriter;

/** Writes Iceberg generic {@link Record} objects to Arrow vectors for Vortex file output. */
Expand Down Expand Up @@ -90,7 +96,12 @@ public void write(Record datum, VectorSchemaRoot root, int rowIndex) {

ColumnMetricsTracker<Object> tracker = (ColumnMetricsTracker<Object>) trackers[fieldIndex];
if (value == null) {
vector.setNull(rowIndex);
if (field.isRequired()) {
throw new IllegalArgumentException(
"Cannot write null value for required field: " + field);
}

writeNull(vector, field.type(), rowIndex);
tracker.addNull();
continue;
}
Expand Down Expand Up @@ -225,13 +236,71 @@ private static void writeValue(
}
// Mark the struct slot itself as non-null for this row.
structVector.setIndexDefined(rowIndex);
break;
case VARIANT:
writeVariant((StructVector) vector, (Variant) value, rowIndex);

break;
default:
throw new UnsupportedOperationException(
"Unsupported Iceberg type for Vortex write: " + type);
}
}

private static void writeNull(FieldVector vector, Type type, int rowIndex) {
if (type.isVariantType()) {
writeNullVariant((StructVector) vector, rowIndex);
} else {
vector.setNull(rowIndex);
}
}

private static void writeNullVariant(StructVector vector, int rowIndex) {
vector.setNull(rowIndex);
writeVariantMetadata(
vector.getChild("metadata", VarBinaryVector.class), VariantMetadata.empty(), rowIndex);

VarBinaryVector valueVector = vector.getChild("value", VarBinaryVector.class);
if (valueVector != null) {
valueVector.setNull(rowIndex);
}
}

private static void writeVariant(StructVector vector, Variant variant, int rowIndex) {
vector.setIndexDefined(rowIndex);

writeVariantMetadata(
vector.getChild("metadata", VarBinaryVector.class), variant.metadata(), rowIndex);
writeVariantValue(vector.getChild("value", VarBinaryVector.class), variant.value(), rowIndex);
}

private static void writeVariantMetadata(
VarBinaryVector vector, VariantMetadata metadata, int rowIndex) {
if (metadata instanceof Serialized serialized) {
writeSerialized(vector, serialized, rowIndex);
return;
}

ByteBuffer buffer = ByteBuffer.allocate(metadata.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
int length = metadata.writeTo(buffer, 0);
vector.setSafe(rowIndex, buffer, 0, length);
}

private static void writeVariantValue(VarBinaryVector vector, VariantValue value, int rowIndex) {
if (value instanceof Serialized serialized) {
writeSerialized(vector, serialized, rowIndex);
return;
}

ByteBuffer buffer = ByteBuffer.allocate(value.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
int length = value.writeTo(buffer, 0);
vector.setSafe(rowIndex, buffer, 0, length);
}

private static void writeSerialized(VarBinaryVector vector, Serialized serialized, int rowIndex) {
vector.setSafe(rowIndex, ByteBuffers.toByteArray(serialized.buffer()));
}

@SuppressWarnings({"unchecked", "rawtypes"})
private static ColumnMetricsTracker<?> newTracker(Types.NestedField field) {
switch (field.type().typeId()) {
Expand Down Expand Up @@ -274,7 +343,7 @@ private static ColumnMetricsTracker<?> newTracker(Types.NestedField field) {
v -> ChronoUnit.NANOS.between(LOCAL_EPOCH, (LocalDateTime) v));
}
default:
if (field.type().isNestedType()) {
if (field.type().isNestedType() || field.type().isVariantType()) {
// Lists, maps, and structs have no natural ordering — track counts only.
return new ColumnMetricsTracker<>(field.fieldId(), null);
}
Expand All @@ -296,6 +365,10 @@ static class ColumnMetricsTracker<T> {
private T min;
private T max;

ColumnMetricsTracker(int fieldId) {
this(fieldId, null, null);
}

ColumnMetricsTracker(int fieldId, Comparator<T> comparator) {
this(fieldId, comparator, null);
}
Expand Down
15 changes: 15 additions & 0 deletions vortex/src/main/java/org/apache/iceberg/vortex/VortexMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ static Metrics buildMetrics(
}
});

addVariantValueCounts(rowCount, schema, metricsConfig, valueCounts);

return new Metrics(
rowCount,
null, // columnSizes not available without Vortex JNI support
Expand All @@ -101,6 +103,19 @@ static Metrics buildMetrics(
originalTypes.isEmpty() ? null : originalTypes);
}

private static void addVariantValueCounts(
long rowCount, Schema schema, MetricsConfig metricsConfig, Map<Integer, Long> valueCounts) {
for (Types.NestedField column : schema.columns()) {
int id = column.fieldId();
MetricsModes.MetricsMode mode = MetricsUtil.metricsMode(schema, metricsConfig, id);
if (column.type().isVariantType()
&& mode != MetricsModes.None.get()
&& !valueCounts.containsKey(id)) {
valueCounts.put(id, rowCount);
}
}
}

private static int truncateLength(MetricsModes.MetricsMode mode) {
if (mode instanceof MetricsModes.Truncate truncate) {
return truncate.length();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public abstract class VortexSchemaWithTypeVisitor<T> {

public abstract T primitive(Type.PrimitiveType iPrimitive, Field primField);

public abstract T variant(Types.VariantType variantType, Field variantField);

public static <T> T visit(
Schema expectedSchema,
org.apache.arrow.vector.types.pojo.Schema fileSchema,
Expand All @@ -48,6 +50,10 @@ public static <T> T visit(
}

public static <T> T visit(Type iType, Field field, VortexSchemaWithTypeVisitor<T> visitor) {
if ((iType != null && iType.isVariantType()) || VortexSchemas.isVariantField(field)) {
return visitor.variant(iType != null ? iType.asVariantType() : null, field);
}

ArrowType arrowType = field.getType();
if (arrowType instanceof ArrowType.Struct) {
return visitStruct(iType != null ? iType.asStructType() : null, field.getChildren(), visitor);
Expand Down
Loading
Loading