Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

109 changes: 105 additions & 4 deletions encodings/parquet-variant/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use vortex_array::ArrayRef;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_array::VTable;
use vortex_array::arrays::Variant;
use vortex_array::arrays::variant::VariantArrayExt;
use vortex_array::arrow::ArrowExport;
use vortex_array::arrow::ArrowExportVTable;
use vortex_array::arrow::ArrowImport;
Expand Down Expand Up @@ -113,6 +115,35 @@ fn export_unshredded_storage_to_target<T: ParquetVariantArrayExt>(
export_storage_to_target(&unshredded_parquet, target_fields, ctx)
}

fn parquet_variant_for_export(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
let executed = array.execute_until::<ParquetVariant>(ctx)?;
if executed.is::<ParquetVariant>() {
return Ok(executed);
}

let variant = executed
.as_opt::<Variant>()
.ok_or_else(|| vortex_err!("cannot export Variant without ParquetVariant storage"))?;
let core_storage = variant
.core_storage()
.clone()
.execute_until::<ParquetVariant>(ctx)?;
let parquet_core = core_storage
.as_opt::<ParquetVariant>()
.ok_or_else(|| vortex_err!("cannot export Variant without ParquetVariant core storage"))?;
let Some(shredded) = variant.shredded() else {
return Ok(core_storage);
};

ParquetVariant::try_new(
ParquetVariantArrayExt::validity(&parquet_core),
parquet_core.metadata_array().clone(),
parquet_core.value_array().cloned(),
Some(shredded.clone()),
)
.map(IntoArray::into_array)
}

impl ArrowExportVTable for ParquetVariant {
fn arrow_ext_id(&self) -> Id {
*ARROW_PARQUET_VARIANT
Expand Down Expand Up @@ -148,10 +179,8 @@ impl ArrowExportVTable for ParquetVariant {
return Ok(ArrowExport::Unsupported(array));
}

let executed = array.execute_until::<ParquetVariant>(ctx)?;
let parquet_array = executed
.as_opt::<ParquetVariant>()
.ok_or_else(|| vortex_err!("cannot export Variant without ParquetVariant storage"))?;
let parquet_array = parquet_variant_for_export(array, ctx)?;
let parquet_array = parquet_array.as_::<ParquetVariant>();

if let DataType::Struct(fields) = target.data_type()
&& let Some((request_has_value, request_has_typed_value)) =
Expand Down Expand Up @@ -261,6 +290,7 @@ mod tests {
use vortex_array::VortexSessionExecute;
use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::arrays::VariantArray;
use vortex_array::arrow::ArrowSessionExt;
use vortex_array::assert_arrays_eq;
use vortex_array::dtype::DType;
Expand Down Expand Up @@ -359,6 +389,77 @@ mod tests {
Ok(())
}

#[rstest]
fn export_canonical_variant_with_parquet_variant_core_storage(
session: VortexSession,
) -> VortexResult<()> {
let storage = arrow_variant_storage();
let field = arrow_variant_field(&storage);
let core_storage = session
.arrow()
.from_arrow_array(Arc::new(storage.clone()) as ArrowArrayRef, &field)?;
let canonical = VariantArray::try_new(core_storage, None)?.into_array();

let mut ctx = session.create_execution_ctx();
let exported = session
.arrow()
.execute_arrow(canonical, Some(&field), &mut ctx)?;
let exported = exported.as_struct();

assert_struct_arrays_eq(exported, &storage);
Ok(())
}

#[rstest]
fn export_canonical_variant_reattaches_shredded_child(
session: VortexSession,
) -> VortexResult<()> {
let rows = [
VariantBuilder::new().with_value(10i32).finish(),
VariantBuilder::new().with_value(20i32).finish(),
VariantBuilder::new().with_value(30i32).finish(),
];
let metadata =
VarBinViewArray::from_iter_bin(rows.iter().map(|(metadata, _)| metadata.as_slice()))
.into_array();
let typed_value = buffer![10i32, 20, 30].into_array();
let expected =
ParquetVariant::try_new(Validity::NonNullable, metadata, None, Some(typed_value))?
.into_array();

let mut ctx = session.create_execution_ctx();
let canonical = expected
.clone()
.execute::<VariantArray>(&mut ctx)?
.into_array();
let field = Field::new(
"variant",
DataType::Struct(
vec![
Field::new("metadata", DataType::BinaryView, false),
Field::new("value", DataType::BinaryView, false),
]
.into(),
),
false,
)
.with_metadata(
[(
EXTENSION_TYPE_NAME_KEY.to_string(),
PARQUET_VARIANT_ARROW_EXTENSION_NAME.to_string(),
)]
.into(),
);

let exported = session
.arrow()
.execute_arrow(canonical, Some(&field), &mut ctx)?;
assert_eq!(exported.data_type(), field.data_type());

let actual = session.arrow().from_arrow_array(exported, &field)?;
assert_variant_scalars_eq(&actual, &expected, &session)
}

#[rstest]
fn roundtrip_parquet_variant_extension_array_from_vortex(
session: VortexSession,
Expand Down
4 changes: 3 additions & 1 deletion java/vortex-jni/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ mavenPublishing {
coordinates(groupId = "dev.vortex", artifactId = "vortex-jni", version = "${rootProject.version}")
publishToMavenCentral()

signAllPublications()
if (!project.hasProperty("skip.signing")) {
signAllPublications()
}

pom {
name = "vortex-jni"
Expand Down
97 changes: 97 additions & 0 deletions java/vortex-jni/src/test/java/dev/vortex/jni/JNIWriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dev.vortex.jni;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -26,17 +27,25 @@
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.arrow.vector.types.pojo.Schema;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public final class JNIWriterTest {
private static final String ARROW_EXTENSION_NAME = "ARROW:extension:name";
private static final String PARQUET_VARIANT_EXTENSION_NAME = "arrow.parquet.variant";
private static final byte[] VARIANT_METADATA = new byte[] {0x01, 0x00};
private static final byte[] VARIANT_INT8_42 = new byte[] {0x0c, 0x2a};
private static final byte[] VARIANT_TRUE = new byte[] {0x04};

@TempDir
Path tempDir;
Expand All @@ -52,6 +61,45 @@ private static Schema personSchema() {
Field.notNullable("age", new ArrowType.Int(32, true))));
}

private static Schema parquetVariantSchema() {
Field variant = new Field(
"variant",
new FieldType(
true,
ArrowType.Struct.INSTANCE,
null,
Map.of(ARROW_EXTENSION_NAME, PARQUET_VARIANT_EXTENSION_NAME)),
List.of(
Field.notNullable("metadata", new ArrowType.Binary()),
Field.nullable("value", new ArrowType.Binary())));
return new Schema(List.of(variant));
}

private static void populateParquetVariantRoot(VectorSchemaRoot root) {
StructVector variant = (StructVector) root.getVector("variant");
VarBinaryVector metadata = variant.getChild("metadata", VarBinaryVector.class);
VarBinaryVector value = variant.getChild("value", VarBinaryVector.class);

variant.allocateNew();
metadata.allocateNew(3);
value.allocateNew(3);

metadata.setSafe(0, VARIANT_METADATA);
metadata.setSafe(1, VARIANT_METADATA);
metadata.setSafe(2, VARIANT_METADATA);
value.setSafe(0, VARIANT_INT8_42);
value.setSafe(1, VARIANT_TRUE);
value.setNull(2);
variant.setIndexDefined(0);
variant.setIndexDefined(1);
variant.setNull(2);

metadata.setValueCount(3);
value.setValueCount(3);
variant.setValueCount(3);
root.setRowCount(3);
}

@Test
public void testCreateWriter() throws IOException {
Path outputPath = tempDir.resolve("test_create.vortex");
Expand Down Expand Up @@ -155,4 +203,53 @@ public void testWriteBatch() throws IOException {
}
}
}

@Test
public void testParquetVariantRoundTrip() throws IOException {
Path outputPath = tempDir.resolve("test_parquet_variant.vortex");
String writePath = outputPath.toAbsolutePath().toUri().toString();

BufferAllocator allocator = ArrowAllocation.rootAllocator();
Schema schema = parquetVariantSchema();

Session session = Session.create();
try (VortexWriter writer = VortexWriter.create(session, writePath, schema, new HashMap<>(), allocator);
VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) {
populateParquetVariantRoot(root);

try (ArrowArray arrowArray = ArrowArray.allocateNew(allocator);
ArrowSchema arrowSchemaFfi = ArrowSchema.allocateNew(allocator)) {
Data.exportVectorSchemaRoot(allocator, root, null, arrowArray, arrowSchemaFfi);
writer.writeBatch(arrowArray.memoryAddress(), arrowSchemaFfi.memoryAddress());
}
}

assertTrue(Files.exists(outputPath), "output file should exist");

DataSource ds = DataSource.open(session, writePath);
Field dataSourceField = ds.arrowSchema(allocator).findField("variant");
assertEquals(
PARQUET_VARIANT_EXTENSION_NAME, dataSourceField.getMetadata().get(ARROW_EXTENSION_NAME));

Scan scan = ds.scan(ScanOptions.of());
Field scanField = scan.arrowSchema(allocator).findField("variant");
assertEquals(PARQUET_VARIANT_EXTENSION_NAME, scanField.getMetadata().get(ARROW_EXTENSION_NAME));

while (scan.hasNext()) {
Partition p = scan.next();
try (ArrowReader reader = p.scanArrow(allocator)) {
assertTrue(reader.loadNextBatch());
VectorSchemaRoot resultRoot = reader.getVectorSchemaRoot();
StructVector variant = (StructVector) resultRoot.getVector("variant");
VarBinaryVector metadata = variant.getChild("metadata", VarBinaryVector.class);
VarBinaryVector value = variant.getChild("value", VarBinaryVector.class);

assertArrayEquals(VARIANT_METADATA, metadata.get(0));
assertArrayEquals(VARIANT_INT8_42, value.get(0));
assertArrayEquals(VARIANT_METADATA, metadata.get(1));
assertArrayEquals(VARIANT_TRUE, value.get(1));
assertTrue(variant.isNull(2));
}
}
}
}
1 change: 1 addition & 0 deletions vortex-jni/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ tracing = { workspace = true, features = ["std", "log"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
url = { workspace = true }
vortex = { workspace = true, features = ["object_store", "files"] }
vortex-parquet-variant = { workspace = true }

[dev-dependencies]
jni = { workspace = true, features = ["invocation"] }
Expand Down
11 changes: 5 additions & 6 deletions vortex-jni/src/dtype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use arrow_array::ffi::FFI_ArrowSchema;
use arrow_schema::DataType;
use arrow_schema::FieldRef;
use arrow_schema::Fields;
use arrow_schema::Schema;
use vortex::dtype::DType;
use vortex::dtype::arrow::FromArrowType;
use vortex::error::VortexResult;

/// Export a Vortex [`DType`] to the Arrow C Data Interface struct at `schema_addr`. Views
Expand All @@ -24,7 +24,7 @@ pub(crate) fn export_dtype_to_arrow(dtype: &DType, schema_addr: i64) -> VortexRe
DataType::Struct(fields) => fields,
_ => unreachable!("Vortex DType always exports as a struct"),
};
let schema = arrow_schema::Schema::new(fields);
let schema = Schema::new(fields);
let ffi_schema = FFI_ArrowSchema::try_from(&schema)?;
unsafe {
ptr::write(schema_addr as *mut FFI_ArrowSchema, ffi_schema);
Expand Down Expand Up @@ -70,9 +70,8 @@ pub(crate) fn strip_views(data_type: DataType) -> DataType {
}
}

/// Decode an [`FFI_ArrowSchema`] pointed to by `schema_addr` into a Vortex [`DType`].
pub(crate) fn import_dtype_from_arrow(schema_addr: i64) -> VortexResult<DType> {
/// Decode an [`FFI_ArrowSchema`] pointed to by `schema_addr` into an Arrow [`Schema`].
pub(crate) fn import_arrow_schema(schema_addr: i64) -> VortexResult<Schema> {
let ffi_schema = unsafe { &*(schema_addr as *const FFI_ArrowSchema) };
let arrow_schema = arrow_schema::Schema::try_from(ffi_schema)?;
Ok(DType::from_arrow(&arrow_schema))
Ok(Schema::try_from(ffi_schema)?)
}
4 changes: 3 additions & 1 deletion vortex-jni/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ use crate::RUNTIME;
/// Constructs a fresh [`VortexSession`] bound to the JNI-shared tokio runtime and returns
/// an opaque pointer that Java must pass to [`Java_dev_vortex_jni_NativeSession_free`].
pub(crate) fn new_session() -> Box<VortexSession> {
Box::new(VortexSession::default().with_handle(RUNTIME.handle()))
let session = VortexSession::default().with_handle(RUNTIME.handle());
vortex_parquet_variant::initialize(&session);
Box::new(session)
}

/// SAFETY: caller must pass a pointer previously returned by [`new_session`].
Expand Down
Loading
Loading