Skip to content

Commit d2fee0e

Browse files
committed
more compressor pieces
Signed-off-by: Adam Gutglick <adam@spiraldb.com>
1 parent bb26650 commit d2fee0e

1 file changed

Lines changed: 41 additions & 39 deletions

File tree

  • vortex-btrblocks/src/variant

vortex-btrblocks/src/variant/mod.rs

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use vortex_array::ArrayRef;
1616
use vortex_array::ArrayView;
1717
use vortex_array::Canonical;
1818
use vortex_array::EmptyArrayData;
19+
use vortex_array::EmptyMetadata;
1920
use vortex_array::ExecutionCtx;
2021
use vortex_array::ExecutionResult;
2122
use vortex_array::IntoArray;
@@ -31,7 +32,6 @@ use vortex_array::dtype::DType;
3132
use vortex_array::dtype::extension::ExtDType;
3233
use vortex_array::dtype::extension::ExtId;
3334
use vortex_array::dtype::extension::ExtVTable;
34-
use vortex_array::extension::EmptyMetadata;
3535
use vortex_array::scalar::ScalarValue;
3636
use vortex_array::serde::ArrayChildren;
3737
use vortex_array::validity::Validity;
@@ -407,13 +407,15 @@ impl Scheme for JsonToVariantScheme {
407407
})
408408
.transpose()?;
409409

410-
ParquetVariant::try_new(
410+
let variant = ParquetVariant::try_new(
411411
parquet_variant.validity()?,
412412
compressed_metadata,
413413
compressed_value,
414414
parquet_variant.typed_value_array().cloned(),
415-
)
416-
.map(IntoArray::into_array)
415+
)?
416+
.into_array();
417+
418+
Ok(VariantToJson::try_new(variant)?.into_array())
417419
}
418420
}
419421

@@ -427,17 +429,14 @@ mod tests {
427429
use vortex_array::IntoArray;
428430
use vortex_array::VortexSessionExecute;
429431
use vortex_array::accessor::ArrayAccessor;
430-
use vortex_array::arrays::Extension;
431432
use vortex_array::arrays::ExtensionArray;
432-
use vortex_array::arrays::VarBinView;
433433
use vortex_array::arrays::VarBinViewArray;
434434
use vortex_array::arrays::extension::ExtensionArrayExt;
435435
use vortex_array::session::ArraySession;
436436
use vortex_compressor::builtins::BinaryDictScheme;
437437
use vortex_compressor::builtins::IntConstantScheme;
438438
use vortex_compressor::builtins::StringConstantScheme;
439439
use vortex_compressor::builtins::StringDictScheme;
440-
use vortex_fsst::FSST;
441440
use vortex_session::VortexSession;
442441

443442
use super::*;
@@ -530,15 +529,16 @@ mod tests {
530529

531530
#[test]
532531
fn variant_to_json_canonicalizes_to_json_extension() -> VortexResult<()> {
533-
let values = vec![
532+
let values = [
534533
"0".to_string(),
535534
r#"{"a":32}"#.to_string(),
536535
r#""hello""#.to_string(),
537536
"null".to_string(),
538537
];
539-
let source = json_array(&values)?;
540-
let source_ext = source.as_::<Extension>();
541-
let storage = source_ext.storage_array().clone();
538+
let storage =
539+
VarBinViewArray::from_iter_str(values.iter().map(String::as_str)).into_array();
540+
let source =
541+
ExtensionArray::try_new_from_vtable(Json, EmptyMetadata, storage.clone())?.into_array();
542542

543543
let mut exec_ctx = SESSION.create_execution_ctx();
544544
let arrow_array = {
@@ -556,7 +556,8 @@ mod tests {
556556
let json = wrapped
557557
.into_array()
558558
.execute::<ExtensionArray>(&mut exec_ctx)?;
559-
assert!(json.ext_dtype().is::<Json>());
559+
assert_eq!(json.dtype(), source.dtype());
560+
assert!(json.storage_array().dtype().is_utf8());
560561
let json_storage = json
561562
.storage_array()
562563
.clone()
@@ -590,6 +591,23 @@ mod tests {
590591
])
591592
}
592593

594+
#[test]
595+
fn json_to_variant_scheme_wraps_output_as_json() -> VortexResult<()> {
596+
let array = json_array(&json_data())?;
597+
598+
let variant_compressor = parquet_variant_child_compressor();
599+
let mut exec_ctx = SESSION.create_execution_ctx();
600+
let compressed = variant_compressor.compress(&array, &mut exec_ctx)?;
601+
602+
assert_eq!(compressed.dtype(), array.dtype());
603+
604+
let json = compressed.execute::<ExtensionArray>(&mut exec_ctx)?;
605+
assert_eq!(json.dtype(), array.dtype());
606+
assert!(json.storage_array().dtype().is_utf8());
607+
608+
Ok(())
609+
}
610+
593611
fn print_comparison_output(
594612
array: &ArrayRef,
595613
string_compressed: &ArrayRef,
@@ -629,13 +647,6 @@ mod tests {
629647
let mut exec_ctx = SESSION.create_execution_ctx();
630648
let variant_compressed = variant_compressor.compress(&array, &mut exec_ctx)?;
631649

632-
assert!(
633-
variant_compressed.is::<ParquetVariant>(),
634-
"expected ParquetVariant output, got encoding {} with dtype {} and {} bytes",
635-
variant_compressed.encoding_id(),
636-
variant_compressed.dtype(),
637-
variant_compressed.nbytes()
638-
);
639650
assert!(
640651
variant_compressed.nbytes() < string_compressed.nbytes(),
641652
"Parquet Variant conversion should compress repeated JSON keys: \
@@ -653,19 +664,21 @@ mod tests {
653664
fn recursively_compresses_parquet_variant_binary_children() -> VortexResult<()> {
654665
let array: ArrayRef = json_array(&json_data())?;
655666

667+
let mut exec_ctx = SESSION.create_execution_ctx();
668+
let uncompressed_children =
669+
CascadingCompressor::new(vec![&JsonToVariantScheme]).compress(&array, &mut exec_ctx)?;
670+
656671
let variant_compressor = parquet_variant_child_compressor();
657672
let mut exec_ctx = SESSION.create_execution_ctx();
658673
let compressed = variant_compressor.compress(&array, &mut exec_ctx)?;
659-
let parquet_variant = compressed.downcast::<ParquetVariant>();
660674

661675
assert!(
662-
!parquet_variant.metadata_array().is::<VarBinView>(),
663-
"expected Parquet Variant metadata child to be compressed, got {}",
664-
parquet_variant.metadata_array().encoding_id(),
676+
compressed.nbytes() < uncompressed_children.nbytes(),
677+
"recursive child compression should reduce Parquet Variant size: compressed={} bytes, uncompressed_children={} bytes",
678+
compressed.nbytes(),
679+
uncompressed_children.nbytes(),
665680
);
666-
assert!(parquet_variant.value_array().is_some());
667-
assert!(parquet_variant.typed_value_array().is_none());
668-
681+
assert_eq!(compressed.dtype(), array.dtype());
669682
Ok(())
670683
}
671684

@@ -689,24 +702,13 @@ mod tests {
689702
let mut exec_ctx = SESSION.create_execution_ctx();
690703
let with_binary_fsst =
691704
parquet_variant_child_compressor().compress(&array, &mut exec_ctx)?;
692-
let parquet_variant = with_binary_fsst.clone().downcast::<ParquetVariant>();
693705

694706
assert!(
695707
with_binary_fsst.nbytes() < without_binary_fsst.nbytes(),
696708
"binary FSST should improve Parquet Variant child compression: with={} bytes, without={} bytes",
697709
with_binary_fsst.nbytes(),
698710
without_binary_fsst.nbytes(),
699711
);
700-
assert!(
701-
parquet_variant
702-
.value_array()
703-
.is_some_and(|value| value.is::<FSST>()),
704-
"expected Parquet Variant value child to use binary FSST, got {}",
705-
parquet_variant.value_array().map_or_else(
706-
|| "missing".to_string(),
707-
|value| value.encoding_id().to_string()
708-
),
709-
);
710712

711713
Ok(())
712714
}
@@ -732,9 +734,9 @@ mod tests {
732734
let variant_compressor = CascadingCompressor::new(vec![
733735
&JsonToVariantScheme,
734736
&BinaryDictScheme,
735-
&FSSTScheme,
737+
// &FSSTScheme,
736738
&BinaryFSSTScheme,
737-
// &ZstdScheme,
739+
// &crate::schemes::binary::BinaryZstdScheme,
738740
&IntConstantScheme,
739741
&StringConstantScheme,
740742
&FoRScheme,

0 commit comments

Comments
 (0)