Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions encodings/alp/src/alp/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -695,22 +695,15 @@ mod tests {
})
.collect();

let mut ctx = SESSION.create_execution_ctx();
let array = PrimitiveArray::from_option_iter(values.clone());
let encoded = alp_encode(
array.as_view(),
None,
&mut LEGACY_SESSION.create_execution_ctx(),
)
.unwrap();
let encoded = alp_encode(array.as_view(), None, &mut ctx).unwrap();

let slice_end = size - slice_start;
let slice_len = slice_end - slice_start;
let sliced_encoded = encoded.slice(slice_start..slice_end).unwrap();

let result_canonical = {
let mut ctx = SESSION.create_execution_ctx();
sliced_encoded.execute::<Canonical>(&mut ctx).unwrap()
};
let result_canonical = sliced_encoded.execute::<Canonical>(&mut ctx).unwrap();
let result_primitive = result_canonical.into_primitive();

for idx in 0..slice_len {
Expand All @@ -719,7 +712,7 @@ mod tests {
let result_valid = result_primitive
.validity()
.vortex_expect("result validity should be derivable")
.is_valid(idx)
.execute_is_valid(idx, &mut ctx)
.unwrap();
assert_eq!(
result_valid,
Expand Down
2 changes: 1 addition & 1 deletion encodings/fastlanes/src/rle/array/rle_compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ mod tests {
let mut rng_state: u32 = 0xDEAD_BEEF;
let validity = indices_prim.validity()?;
for (i, idx) in indices_data.iter_mut().enumerate() {
if !validity.is_valid(i).unwrap_or(true) {
if !validity.execute_is_valid(i, ctx).unwrap_or(true) {
// xorshift32
rng_state ^= rng_state << 13;
rng_state ^= rng_state >> 17;
Expand Down
2 changes: 1 addition & 1 deletion encodings/parquet-variant/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl OperationsVTable<ParquetVariant> for ParquetVariant {
index: usize,
ctx: &mut ExecutionCtx,
) -> VortexResult<Scalar> {
if array.validity()?.is_null(index)? {
if array.validity()?.execute_is_null(index, ctx)? {
return Ok(Scalar::null(DType::Variant(Nullability::Nullable)));
}

Expand Down
13 changes: 7 additions & 6 deletions vortex-array/src/arrays/constant/vtable/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,14 +754,15 @@ mod tests {
let element_validity = elements
.validity()
.vortex_expect("constant canonical element validity should be derivable");
assert!(element_validity.is_valid(0).unwrap());
assert!(!element_validity.is_valid(1).unwrap());
assert!(element_validity.is_valid(2).unwrap());
let mut ctx = LEGACY_SESSION.create_execution_ctx();
assert!(element_validity.execute_is_valid(0, &mut ctx).unwrap());
assert!(!element_validity.execute_is_valid(1, &mut ctx).unwrap());
assert!(element_validity.execute_is_valid(2, &mut ctx).unwrap());

// Pattern should repeat.
assert!(element_validity.is_valid(3).unwrap());
assert!(!element_validity.is_valid(4).unwrap());
assert!(element_validity.is_valid(5).unwrap());
assert!(element_validity.execute_is_valid(3, &mut ctx).unwrap());
assert!(!element_validity.execute_is_valid(4, &mut ctx).unwrap());
assert!(element_validity.execute_is_valid(5, &mut ctx).unwrap());
}

#[test]
Expand Down
15 changes: 10 additions & 5 deletions vortex-array/src/arrays/fixed_size_list/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use vortex_error::vortex_ensure;

use crate::ArrayRef;
use crate::ArraySlots;
use crate::LEGACY_SESSION;
use crate::VortexSessionExecute;
use crate::array::Array;
use crate::array::ArrayParts;
use crate::array::TypedArrayRef;
Expand Down Expand Up @@ -236,11 +238,14 @@ pub trait FixedSizeListArrayExt: TypedArrayRef<FixedSizeList> {
index,
self.as_ref().len(),
);
debug_assert!(
self.fixed_size_list_validity()
.is_valid(index)
.unwrap_or(false)
);
#[expect(clippy::debug_assert_with_mut_call)]
{
debug_assert!(
self.fixed_size_list_validity()
.execute_is_valid(index, &mut LEGACY_SESSION.create_execution_ctx())
.unwrap_or(false)
);
}

let start = self.list_size() as usize * index;
let end = self.list_size() as usize * (index + 1);
Expand Down
15 changes: 10 additions & 5 deletions vortex-array/src/arrays/listview/rebuild.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,9 +209,11 @@ impl ListViewArray {
let mut new_sizes = BufferMut::<S>::with_capacity(len);
let mut take_indices = BufferMut::<u64>::with_capacity(self.elements().len());

let mut ctx = LEGACY_SESSION.create_execution_ctx();
let validity = self.validity()?;
let mut n_elements = NewOffset::zero();
for index in 0..len {
if !self.validity()?.is_valid(index)? {
if !validity.execute_is_valid(index, &mut ctx)? {
new_offsets.push(n_elements);
new_sizes.push(S::zero());
continue;
Expand Down Expand Up @@ -292,9 +294,11 @@ impl ListViewArray {
let mut new_elements_builder =
builder_with_capacity(element_dtype.as_ref(), self.elements().len());

let mut ctx = LEGACY_SESSION.create_execution_ctx();
let validity = self.validity()?;
let mut n_elements = NewOffset::zero();
for index in 0..len {
if !self.validity()?.is_valid(index)? {
if !validity.execute_is_valid(index, &mut ctx)? {
// For NULL lists, place them after the previous item's data to maintain the
// no-overlap invariant for zero-copy to `ListArray` arrays.
new_offsets.push(n_elements);
Expand Down Expand Up @@ -482,9 +486,10 @@ mod tests {

// Verify nullability is preserved
assert_eq!(flattened.dtype().nullability(), Nullability::Nullable);
assert!(flattened.validity()?.is_valid(0)?);
assert!(!flattened.validity()?.is_valid(1)?);
assert!(flattened.validity()?.is_valid(2)?);
let mut ctx = LEGACY_SESSION.create_execution_ctx();
assert!(flattened.validity()?.execute_is_valid(0, &mut ctx)?);
assert!(!flattened.validity()?.execute_is_valid(1, &mut ctx)?);
assert!(flattened.validity()?.execute_is_valid(2, &mut ctx)?);

// Verify valid lists contain correct data
assert_arrays_eq!(
Expand Down
99 changes: 55 additions & 44 deletions vortex-array/src/arrays/varbin/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::fmt::Formatter;

use num_traits::AsPrimitive;
use smallvec::smallvec;
use vortex_array::arrays::PrimitiveArray;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
Expand All @@ -15,8 +16,6 @@ use vortex_error::vortex_err;
use crate::ArrayRef;
use crate::ArraySlots;
use crate::LEGACY_SESSION;
#[expect(deprecated)]
use crate::ToCanonical as _;
use crate::VortexSessionExecute;
use crate::array::Array;
use crate::array::ArrayParts;
Expand Down Expand Up @@ -208,26 +207,6 @@ impl VarBinData {
InvalidArgument: "Offsets must have at least one element"
);

// Skip host-only validation when offsets/bytes are not host-resident.
if offsets.is_host() && bytes.is_on_host() {
let last_offset = offsets
.execute_scalar(
offsets.len() - 1,
&mut LEGACY_SESSION.create_execution_ctx(),
)?
.as_primitive()
.as_::<usize>()
.ok_or_else(
|| vortex_err!(InvalidArgument: "Last offset must be convertible to usize"),
)?;
vortex_ensure!(
last_offset <= bytes.len(),
InvalidArgument: "Last offset {} exceeds bytes length {}",
last_offset,
bytes.len()
);
}

// Check validity length
if let Some(validity_len) = validity.maybe_len() {
vortex_ensure!(
Expand All @@ -244,33 +223,65 @@ impl VarBinData {
&& matches!(dtype, DType::Utf8(_))
&& let Some(bytes) = bytes.as_host_opt()
{
#[expect(deprecated)]
let primitive_offsets = offsets.to_primitive();
match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
let offsets_slice = primitive_offsets.as_slice::<O>();
for (i, (start, end)) in offsets_slice
.windows(2)
.map(|o| (o[0].as_(), o[1].as_()))
.enumerate()
{
if validity.is_null(i)? {
continue;
}

let string_bytes = &bytes.as_ref()[start..end];
simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
#[expect(clippy::unwrap_used)]
// run validation using `compat` package to get more detailed error message
let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
vortex_err!("invalid utf-8: {err} at index {i}")
})?;
}
});
Self::validate_utf8(offsets, bytes.as_ref(), validity)?;
}

Ok(())
}

/// Validates that every non-null value is valid UTF-8.
fn validate_utf8(offsets: &ArrayRef, bytes: &[u8], validity: &Validity) -> VortexResult<()> {
let validate_at = |i: usize, start: usize, end: usize| -> VortexResult<()> {
let string_bytes = &bytes[start..end];
simdutf8::basic::from_utf8(string_bytes).map_err(|_| {
#[expect(clippy::unwrap_used)]
// run validation using `compat` package to get more detailed error message
let err = simdutf8::compat::from_utf8(string_bytes).unwrap_err();
vortex_err!("invalid utf-8: {err} at index {i}")
})?;
Ok(())
};

let mut ctx = LEGACY_SESSION.create_execution_ctx();
// TODO(joe): update the created VarBin with this decompressed Array.
let primitive_offsets = offsets.clone().execute::<PrimitiveArray>(&mut ctx)?;

// Array-backed validity is the only variant that needs an execution context: execute it into
// a mask once. The constant variants resolve null-ness without one. Resolving this before
// the per-type dispatch keeps the dtype loop simple.
let mask = match validity {
Validity::Array(_) => {
Some(validity.execute_mask(primitive_offsets.len().saturating_sub(1), &mut ctx)?)
}
_ => None,
};
let all_invalid = matches!(validity, Validity::AllInvalid);

match_each_integer_ptype!(primitive_offsets.dtype().as_ptype(), |O| {
let offsets_slice = primitive_offsets.as_slice::<O>();

let last_offset: usize = offsets_slice[offsets_slice.len() - 1].as_();
vortex_ensure!(
last_offset <= bytes.len(),
InvalidArgument: "Last offset {} exceeds bytes length {}",
last_offset,
bytes.len()
);

for (i, (start, end)) in offsets_slice
.windows(2)
.map(|o| (o[0].as_(), o[1].as_()))
.enumerate()
{
let valid = mask.as_ref().map_or(!all_invalid, |mask| mask.value(i));
if valid {
validate_at(i, start, end)?;
}
}
});
Ok(())
}

/// Access the value bytes child buffer
///
/// # Note
Expand Down
4 changes: 3 additions & 1 deletion vortex-array/src/arrays/varbin/compute/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ fn filter_select_var_bin_by_index(
mask_indices,
values.validity()?,
selection_count,
ctx,
)
})
}
Expand All @@ -184,10 +185,11 @@ fn filter_select_var_bin_by_index_primitive_offset<O: IntegerPType>(
// TODO(ngates): pass LogicalValidity instead
validity: Validity,
selection_count: usize,
ctx: &mut ExecutionCtx,
) -> VortexResult<VarBinArray> {
let mut builder = VarBinBuilder::<O>::with_capacity(selection_count);
for idx in mask_indices.iter().copied() {
if validity.is_valid(idx)? {
if validity.execute_is_valid(idx, ctx)? {
let (start, end) = (
offsets[idx].to_usize().ok_or_else(|| {
vortex_err!("Failed to convert offset to usize: {}", offsets[idx])
Expand Down
32 changes: 27 additions & 5 deletions vortex-array/src/arrays/varbinview/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use vortex_error::vortex_err;
use vortex_error::vortex_panic;

use crate::ArraySlots;
use crate::LEGACY_SESSION;
use crate::VortexSessionExecute;
use crate::array::Array;
use crate::array::ArrayParts;
use crate::array::TypedArrayRef;
Expand Down Expand Up @@ -316,11 +318,7 @@ impl VarBinViewData {
where
F: Fn(&[u8]) -> bool,
{
for (idx, &view) in views.iter().enumerate() {
if validity.is_null(idx)? {
continue;
}

let validate_view = |idx: usize, view: &BinaryView| -> VortexResult<()> {
if view.is_inlined() {
// Validate the inline bytestring
let bytes = &view.as_inlined().data[..view.len() as usize];
Expand Down Expand Up @@ -364,6 +362,30 @@ impl VarBinViewData {
InvalidArgument: "view at index {idx}: outlined bytes fails utf-8 validation"
);
}
Ok(())
};

match validity {
// Array-backed validity is the only variant that needs an execution context: execute it
// into a mask once and zip it with the views, validating only the valid (non-null)
// entries.
Validity::Array(_) => {
let mut ctx = LEGACY_SESSION.create_execution_ctx();
let mask = validity.execute_mask(views.len(), &mut ctx)?;
for ((idx, view), valid) in views.iter().enumerate().zip(mask.iter()) {
if valid {
validate_view(idx, view)?;
}
}
}
// Every entry is null, so there is nothing to validate.
Validity::AllInvalid => {}
// No nulls: validate every view.
Validity::NonNullable | Validity::AllValid => {
for (idx, view) in views.iter().enumerate() {
validate_view(idx, view)?;
}
}
}

Ok(())
Expand Down
Loading
Loading