diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index d540c54b7e6..25241cc9a02 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -104,3 +104,7 @@ harness = false [[bench]] name = "fsst_cuda" harness = false + +[[bench]] +name = "list_view_cuda" +harness = false diff --git a/vortex-cuda/benches/list_view_cuda.rs b/vortex-cuda/benches/list_view_cuda.rs new file mode 100644 index 00000000000..0904844b900 --- /dev/null +++ b/vortex-cuda/benches/list_view_cuda.rs @@ -0,0 +1,155 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! CUDA benchmarks for Arrow Device export of Vortex list-view arrays. + +#![expect(clippy::cast_possible_truncation)] + +#[allow(dead_code)] +mod bench_config; +mod timed_launch_strategy; + +use std::sync::Arc; +use std::sync::atomic::Ordering; +use std::time::Duration; + +use criterion::BenchmarkId; +use criterion::Criterion; +use criterion::Throughput; +use futures::executor::block_on; +use vortex::array::ArrayRef; +use vortex::array::IntoArray; +use vortex::array::arrays::ListViewArray; +use vortex::array::arrays::PrimitiveArray; +use vortex::array::validity::Validity; +use vortex::dtype::PType; +use vortex::error::VortexExpect; +use vortex::error::VortexResult; +use vortex::session::VortexSession; +use vortex_cuda::CudaExecutionCtx; +use vortex_cuda::CudaSession; +use vortex_cuda::arrow::ArrowDeviceArray; +use vortex_cuda::arrow::DeviceArrayExt; +use vortex_cuda_macros::cuda_available; +use vortex_cuda_macros::cuda_not_available; + +use crate::timed_launch_strategy::TimedLaunchStrategy; + +const LIST_VIEW_CONTIGUOUS_BENCH_SIZES: &[(usize, &str)] = &[(10_000_000, "10M")]; +const LIST_VIEW_REBUILD_BENCH_SIZES: &[(usize, &str)] = &[(10_000_000, "10M")]; + +async fn primitive_i32_on_device( + values: impl IntoIterator, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let primitive = PrimitiveArray::from_iter(values); + let handle = ctx + .ensure_on_device(primitive.buffer_handle().clone()) + .await?; + Ok(PrimitiveArray::from_buffer_handle(handle, PType::I32, Validity::NonNullable).into_array()) +} + +async fn contiguous_list_view(len: usize, ctx: &mut CudaExecutionCtx) -> VortexResult { + let elements = primitive_i32_on_device((0..len).map(|value| value as i32), ctx).await?; + let offsets = primitive_i32_on_device((0..len).map(|value| value as i32), ctx).await?; + let sizes = primitive_i32_on_device(std::iter::repeat_n(1i32, len), ctx).await?; + + Ok(ListViewArray::new(elements, offsets, sizes, Validity::NonNullable).into_array()) +} + +async fn non_contiguous_primitive_list_view( + len: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let elements = primitive_i32_on_device((0..len).map(|value| value as i32), ctx).await?; + let offsets = primitive_i32_on_device((0..len).rev().map(|value| value as i32), ctx).await?; + let sizes = primitive_i32_on_device(std::iter::repeat_n(1i32, len), ctx).await?; + + Ok(ListViewArray::new(elements, offsets, sizes, Validity::NonNullable).into_array()) +} + +unsafe fn release_arrow_device_array(array: &mut ArrowDeviceArray) { + unsafe { + if let Some(release) = array.array.release { + release(&raw mut array.array); + } + } +} + +fn benchmark_list_view_export(c: &mut Criterion) { + let mut group = c.benchmark_group("cuda"); + + for &(len, len_label) in LIST_VIEW_CONTIGUOUS_BENCH_SIZES { + // Contiguous path reads offsets/sizes and writes Arrow offsets. + group.throughput(Throughput::Bytes((len * size_of::() * 3) as u64)); + group.bench_with_input( + BenchmarkId::new("cuda/list_view/contiguous_offsets", len_label), + &len, + |b, &len| { + b.iter_custom(|iters| { + let timed = TimedLaunchStrategy::default(); + let timer = timed.timer(); + + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context") + .with_launch_strategy(Arc::new(timed)); + let array = block_on(contiguous_list_view(len, &mut cuda_ctx)) + .vortex_expect("failed to create list-view fixture"); + + for _ in 0..iters { + let mut exported = + block_on(array.clone().export_device_array(&mut cuda_ctx)) + .vortex_expect("failed to export device array"); + unsafe { release_arrow_device_array(&mut exported) }; + } + + Duration::from_nanos(timer.load(Ordering::Relaxed)) + }); + }, + ); + } + + for &(len, len_label) in LIST_VIEW_REBUILD_BENCH_SIZES { + // Rebuild path scans sizes into Arrow offsets, then gathers primitive child values. + group.throughput(Throughput::Bytes((len * size_of::() * 4) as u64)); + group.bench_with_input( + BenchmarkId::new("cuda/list_view/rebuild_primitive", len_label), + &len, + |b, &len| { + b.iter_custom(|iters| { + let timed = TimedLaunchStrategy::default(); + let timer = timed.timer(); + + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context") + .with_launch_strategy(Arc::new(timed)); + let array = block_on(non_contiguous_primitive_list_view(len, &mut cuda_ctx)) + .vortex_expect("failed to create list-view fixture"); + + for _ in 0..iters { + let mut exported = + block_on(array.clone().export_device_array(&mut cuda_ctx)) + .vortex_expect("failed to export device array"); + unsafe { release_arrow_device_array(&mut exported) }; + } + + Duration::from_nanos(timer.load(Ordering::Relaxed)) + }); + }, + ); + } + + group.finish(); +} + +criterion::criterion_group! { + name = benches; + config = bench_config::cuda_bench_config(); + targets = benchmark_list_view_export +} + +#[cuda_available] +criterion::criterion_main!(benches); + +#[cuda_not_available] +fn main() {} diff --git a/vortex-cuda/cub/build.rs b/vortex-cuda/cub/build.rs index 16b758a57dd..5888d9234dd 100644 --- a/vortex-cuda/cub/build.rs +++ b/vortex-cuda/cub/build.rs @@ -101,6 +101,7 @@ fn generate_rust_bindings(kernels_dir: &Path, out_dir: &Path) { .allowlist_function("filter_temp_size_.*") .allowlist_function("filter_bytemask_.*") .allowlist_function("filter_bitmask_.*") + .allowlist_function("scan_exclusive_sum_.*") // Allow CUDA types .allowlist_type("cudaError_t") // Blocklist cudaStream_t and define it manually as an opaque pointer diff --git a/vortex-cuda/cub/kernels/filter.cu b/vortex-cuda/cub/kernels/filter.cu index 756bbf3c23f..73726b3a4a4 100644 --- a/vortex-cuda/cub/kernels/filter.cu +++ b/vortex-cuda/cub/kernels/filter.cu @@ -188,3 +188,33 @@ DEFINE_FILTER_BITMASK(f32, float) DEFINE_FILTER_BITMASK(f64, double) DEFINE_FILTER_BITMASK(i128, __int128_t) DEFINE_FILTER_BITMASK(i256, __int256_t) + +// Query CUB temporary storage for an exclusive-sum scan. +template +static cudaError_t scan_exclusive_sum_temp_size_impl(size_t *temp_bytes, int64_t num_items) { + size_t bytes = 0; + cudaError_t err = cub::DeviceScan::ExclusiveSum(nullptr, + bytes, + static_cast(nullptr), + static_cast(nullptr), + num_items); + *temp_bytes = bytes; + return err; +} + +// Export one temp-size query and scan launch per supported element type. +#define DEFINE_SCAN_EXCLUSIVE_SUM(suffix, Type) \ + extern "C" cudaError_t scan_exclusive_sum_##suffix##_temp_size(size_t *temp_bytes, int64_t num_items) { \ + return scan_exclusive_sum_temp_size_impl(temp_bytes, num_items); \ + } \ + extern "C" cudaError_t scan_exclusive_sum_##suffix(void *d_temp, \ + size_t temp_bytes, \ + const Type *d_in, \ + Type *d_out, \ + int64_t num_items, \ + cudaStream_t stream) { \ + return cub::DeviceScan::ExclusiveSum(d_temp, temp_bytes, d_in, d_out, num_items, stream); \ + } + +DEFINE_SCAN_EXCLUSIVE_SUM(i32, int32_t) +DEFINE_SCAN_EXCLUSIVE_SUM(i64, int64_t) diff --git a/vortex-cuda/cub/kernels/filter.h b/vortex-cuda/cub/kernels/filter.h index 354b877dc17..c49dc62faed 100644 --- a/vortex-cuda/cub/kernels/filter.h +++ b/vortex-cuda/cub/kernels/filter.h @@ -81,6 +81,24 @@ FILTER_TYPE_TABLE(DECLARE_FILTER_BITMASK) #undef DECLARE_FILTER_BITMASK +cudaError_t scan_exclusive_sum_i32_temp_size(size_t *temp_bytes, int64_t num_items); + +cudaError_t scan_exclusive_sum_i32(void *d_temp, + size_t temp_bytes, + const int32_t *d_in, + int32_t *d_out, + int64_t num_items, + cudaStream_t stream); + +cudaError_t scan_exclusive_sum_i64_temp_size(size_t *temp_bytes, int64_t num_items); + +cudaError_t scan_exclusive_sum_i64(void *d_temp, + size_t temp_bytes, + const int64_t *d_in, + int64_t *d_out, + int64_t num_items, + cudaStream_t stream); + #ifdef __cplusplus } #endif diff --git a/vortex-cuda/cub/src/lib.rs b/vortex-cuda/cub/src/lib.rs index c0532576604..ae8e3b52b0f 100644 --- a/vortex-cuda/cub/src/lib.rs +++ b/vortex-cuda/cub/src/lib.rs @@ -23,6 +23,7 @@ pub mod sys; mod error; pub mod filter; +pub mod scan; pub use error::CubError; diff --git a/vortex-cuda/cub/src/scan.rs b/vortex-cuda/cub/src/scan.rs new file mode 100644 index 00000000000..56b4e64b857 --- /dev/null +++ b/vortex-cuda/cub/src/scan.rs @@ -0,0 +1,71 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! Rust wrappers around CUB DeviceScan operations used by CUDA kernels. + +use std::ffi::c_void; + +use crate::cub_library; +use crate::error::CubError; +use crate::error::check_cuda_error; +pub use crate::sys::cudaStream_t; + +/// Get temporary storage size for CUB `DeviceScan::ExclusiveSum`. +pub fn exclusive_sum_i32_temp_size(num_items: i64) -> Result { + let lib = cub_library()?; + let mut temp_bytes: usize = 0; + let err = unsafe { (lib.scan_exclusive_sum_i32_temp_size)(&raw mut temp_bytes, num_items) }; + check_cuda_error(err, "scan_exclusive_sum_i32_temp_size")?; + Ok(temp_bytes) +} + +/// Execute CUB `DeviceScan::ExclusiveSum`. +/// +/// # Safety +/// +/// All device pointers must be valid and properly sized: +/// - `d_temp` must have at least `temp_bytes` bytes allocated. +/// - `d_in` and `d_out` must have at least `num_items` `i32` values. +pub unsafe fn exclusive_sum_i32( + d_temp: *mut c_void, + temp_bytes: usize, + d_in: *const i32, + d_out: *mut i32, + num_items: i64, + stream: cudaStream_t, +) -> Result<(), CubError> { + let lib = cub_library()?; + let err = + unsafe { (lib.scan_exclusive_sum_i32)(d_temp, temp_bytes, d_in, d_out, num_items, stream) }; + check_cuda_error(err, "scan_exclusive_sum_i32") +} + +/// Get temporary storage size for CUB `DeviceScan::ExclusiveSum`. +pub fn exclusive_sum_i64_temp_size(num_items: i64) -> Result { + let lib = cub_library()?; + let mut temp_bytes: usize = 0; + let err = unsafe { (lib.scan_exclusive_sum_i64_temp_size)(&raw mut temp_bytes, num_items) }; + check_cuda_error(err, "scan_exclusive_sum_i64_temp_size")?; + Ok(temp_bytes) +} + +/// Execute CUB `DeviceScan::ExclusiveSum`. +/// +/// # Safety +/// +/// All device pointers must be valid and properly sized: +/// - `d_temp` must have at least `temp_bytes` bytes allocated. +/// - `d_in` and `d_out` must have at least `num_items` `i64` values. +pub unsafe fn exclusive_sum_i64( + d_temp: *mut c_void, + temp_bytes: usize, + d_in: *const i64, + d_out: *mut i64, + num_items: i64, + stream: cudaStream_t, +) -> Result<(), CubError> { + let lib = cub_library()?; + let err = + unsafe { (lib.scan_exclusive_sum_i64)(d_temp, temp_bytes, d_in, d_out, num_items, stream) }; + check_cuda_error(err, "scan_exclusive_sum_i64") +} diff --git a/vortex-cuda/kernels/src/list_view.cu b/vortex-cuda/kernels/src/list_view.cu new file mode 100644 index 00000000000..4223bc976f9 --- /dev/null +++ b/vortex-cuda/kernels/src/list_view.cu @@ -0,0 +1,248 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#include "config.cuh" + +#include +#include + +namespace { + +template +__device__ bool non_negative_to_u64(T value, uint64_t *out) { + if constexpr (std::is_signed_v) { + if (value < 0) { + return false; + } + } + + *out = static_cast(value); + return true; +} + +__device__ bool checked_add_u64(uint64_t lhs, uint64_t rhs, uint64_t *out) { + if (rhs > UINT64_MAX - lhs) { + return false; + } + + *out = lhs + rhs; + return true; +} + +// Assumes `ListViewArray` construction invariants for basic metadata validity. This kernel only +// decides whether the views are already contiguous Arrow `List` offsets and fit cuDF's i32 limit. +template +__device__ void list_view_offsets_device(const OffsetT *const offsets, + const SizeT *const sizes, + int32_t *const output, + uint32_t *const status, + uint64_t list_len) { + const uint64_t worker = blockIdx.x * blockDim.x + threadIdx.x; + const uint64_t startElem = start_elem(worker, list_len); + const uint64_t stopElem = stop_elem(worker, list_len); + + for (uint64_t idx = startElem; idx < stopElem; idx++) { + const uint64_t offset = static_cast(offsets[idx]); + const uint64_t end = offset + static_cast(sizes[idx]); + output[idx] = static_cast(offset); + + if (end < offset || end > static_cast(INT32_MAX)) { + atomicMax(status, 2u); + } + if (idx == 0 && offset != 0) { + atomicMax(status, 1u); + } + + if (idx + 1 == list_len) { + output[list_len] = static_cast(end); + } else if (static_cast(offsets[idx + 1]) != end) { + atomicMax(status, 1u); + } + } +} + +template +__device__ void list_view_rebuild_init_scan_device(const SizeT *const sizes, + int32_t *const scan, + uint32_t *const status, + uint64_t list_len, + uint64_t scan_len) { + const uint64_t worker = blockIdx.x * blockDim.x + threadIdx.x; + const uint64_t startElem = start_elem(worker, scan_len); + const uint64_t stopElem = stop_elem(worker, scan_len); + + for (uint64_t idx = startElem; idx < stopElem; idx++) { + if (idx >= list_len) { + scan[idx] = 0; + continue; + } + + uint64_t size = 0; + if (!non_negative_to_u64(sizes[idx], &size)) { + atomicMax(status, 1u); + scan[idx] = 0; + } else if (size > static_cast(INT32_MAX)) { + atomicMax(status, 2u); + scan[idx] = 0; + } else { + scan[idx] = static_cast(size); + } + } +} + +template +__device__ void list_view_rebuild_validate_offsets_device(const SizeT *const sizes, + const int32_t *const output_offsets, + uint32_t *const status, + uint64_t list_len) { + const uint64_t worker = blockIdx.x * blockDim.x + threadIdx.x; + const uint64_t startElem = start_elem(worker, list_len); + const uint64_t stopElem = stop_elem(worker, list_len); + + for (uint64_t idx = startElem; idx < stopElem; idx++) { + const int32_t offset = output_offsets[idx]; + const int32_t next_offset = output_offsets[idx + 1]; + if (offset < 0 || next_offset < 0) { + atomicMax(status, 2u); + continue; + } + + uint64_t size = 0; + if (!non_negative_to_u64(sizes[idx], &size)) { + atomicMax(status, 1u); + continue; + } + + const int64_t expected_next = static_cast(offset) + static_cast(size); + if (size > static_cast(INT32_MAX) || expected_next != static_cast(next_offset)) { + atomicMax(status, 2u); + } + } +} + +template +__device__ void list_view_rebuild_primitive_device(const OffsetT *const offsets, + const SizeT *const sizes, + const int32_t *const output_offsets, + const uint8_t *const input_values, + uint8_t *const output_values, + uint32_t *const status, + uint64_t list_len, + uint64_t elements_len, + uint64_t value_width) { + const uint64_t worker = blockIdx.x * blockDim.x + threadIdx.x; + const uint64_t startElem = start_elem(worker, list_len); + const uint64_t stopElem = stop_elem(worker, list_len); + + for (uint64_t list_idx = startElem; list_idx < stopElem; list_idx++) { + uint64_t input_offset = 0; + uint64_t size = 0; + if (!non_negative_to_u64(offsets[list_idx], &input_offset) || + !non_negative_to_u64(sizes[list_idx], &size)) { + atomicMax(status, 1u); + continue; + } + + uint64_t input_end = 0; + if (!checked_add_u64(input_offset, size, &input_end) || input_end > elements_len) { + atomicMax(status, 1u); + continue; + } + + const uint64_t output_idx = static_cast(output_offsets[list_idx]); + for (uint64_t element_idx = 0; element_idx < size; element_idx++) { + const uint64_t input_byte = (input_offset + element_idx) * value_width; + const uint64_t output_byte = (output_idx + element_idx) * value_width; + for (uint64_t byte_idx = 0; byte_idx < value_width; byte_idx++) { + output_values[output_byte + byte_idx] = input_values[input_byte + byte_idx]; + } + } + } +} + +} // namespace + +#define GENERATE_VALIDATE_OFFSETS(SizeT, size_suffix) \ + extern "C" __global__ void list_view_rebuild_validate_offsets_##size_suffix( \ + const SizeT *const sizes, \ + const int32_t *const output_offsets, \ + uint32_t *const status, \ + uint64_t list_len) { \ + list_view_rebuild_validate_offsets_device(sizes, output_offsets, status, list_len); \ + } + +#define GENERATE_KERNEL(OffsetT, offset_suffix, SizeT, size_suffix) \ + extern "C" __global__ void list_view_offsets_##offset_suffix##_##size_suffix( \ + const OffsetT *const offsets, \ + const SizeT *const sizes, \ + int32_t *const output, \ + uint32_t *const status, \ + uint64_t list_len) { \ + list_view_offsets_device(offsets, sizes, output, status, list_len); \ + } \ + extern "C" __global__ void list_view_rebuild_primitive_##offset_suffix##_##size_suffix( \ + const OffsetT *const offsets, \ + const SizeT *const sizes, \ + const int32_t *const output_offsets, \ + const uint8_t *const input_values, \ + uint8_t *const output_values, \ + uint32_t *const status, \ + uint64_t list_len, \ + uint64_t elements_len, \ + uint64_t value_width) { \ + list_view_rebuild_primitive_device(offsets, \ + sizes, \ + output_offsets, \ + input_values, \ + output_values, \ + status, \ + list_len, \ + elements_len, \ + value_width); \ + } + +#define GENERATE_INIT_SCAN(SizeT, size_suffix) \ + extern "C" __global__ void list_view_rebuild_init_scan_##size_suffix(const SizeT *const sizes, \ + int32_t *const scan, \ + uint32_t *const status, \ + uint64_t list_len, \ + uint64_t scan_len) { \ + list_view_rebuild_init_scan_device(sizes, scan, status, list_len, scan_len); \ + } + +#define GENERATE_SIZE_KERNELS(OffsetT, offset_suffix) \ + GENERATE_KERNEL(OffsetT, offset_suffix, uint8_t, u8) \ + GENERATE_KERNEL(OffsetT, offset_suffix, uint16_t, u16) \ + GENERATE_KERNEL(OffsetT, offset_suffix, uint32_t, u32) \ + GENERATE_KERNEL(OffsetT, offset_suffix, uint64_t, u64) \ + GENERATE_KERNEL(OffsetT, offset_suffix, int8_t, i8) \ + GENERATE_KERNEL(OffsetT, offset_suffix, int16_t, i16) \ + GENERATE_KERNEL(OffsetT, offset_suffix, int32_t, i32) \ + GENERATE_KERNEL(OffsetT, offset_suffix, int64_t, i64) + +GENERATE_INIT_SCAN(uint8_t, u8) +GENERATE_INIT_SCAN(uint16_t, u16) +GENERATE_INIT_SCAN(uint32_t, u32) +GENERATE_INIT_SCAN(uint64_t, u64) +GENERATE_INIT_SCAN(int8_t, i8) +GENERATE_INIT_SCAN(int16_t, i16) +GENERATE_INIT_SCAN(int32_t, i32) +GENERATE_INIT_SCAN(int64_t, i64) + +GENERATE_VALIDATE_OFFSETS(uint8_t, u8) +GENERATE_VALIDATE_OFFSETS(uint16_t, u16) +GENERATE_VALIDATE_OFFSETS(uint32_t, u32) +GENERATE_VALIDATE_OFFSETS(uint64_t, u64) +GENERATE_VALIDATE_OFFSETS(int8_t, i8) +GENERATE_VALIDATE_OFFSETS(int16_t, i16) +GENERATE_VALIDATE_OFFSETS(int32_t, i32) +GENERATE_VALIDATE_OFFSETS(int64_t, i64) + +GENERATE_SIZE_KERNELS(uint8_t, u8) +GENERATE_SIZE_KERNELS(uint16_t, u16) +GENERATE_SIZE_KERNELS(uint32_t, u32) +GENERATE_SIZE_KERNELS(uint64_t, u64) +GENERATE_SIZE_KERNELS(int8_t, i8) +GENERATE_SIZE_KERNELS(int16_t, i16) +GENERATE_SIZE_KERNELS(int32_t, i32) +GENERATE_SIZE_KERNELS(int64_t, i64) diff --git a/vortex-cuda/src/arrow/canonical.rs b/vortex-cuda/src/arrow/canonical.rs index 8d8b3779ec4..9d0f2e3bb60 100644 --- a/vortex-cuda/src/arrow/canonical.rs +++ b/vortex-cuda/src/arrow/canonical.rs @@ -8,25 +8,37 @@ use async_trait::async_trait; use futures::future::BoxFuture; use vortex::array::ArrayRef; use vortex::array::Canonical; +use vortex::array::arrays::FixedSizeListArray; +use vortex::array::arrays::ListArray; use vortex::array::arrays::PrimitiveArray; use vortex::array::arrays::StructArray; use vortex::array::arrays::bool::BoolDataParts; use vortex::array::arrays::decimal::DecimalDataParts; use vortex::array::arrays::extension::ExtensionArrayExt; +use vortex::array::arrays::fixed_size_list::FixedSizeListArrayExt; +use vortex::array::arrays::fixed_size_list::FixedSizeListDataParts; +use vortex::array::arrays::list::ListDataParts; +use vortex::array::arrays::listview::list_from_list_view; use vortex::array::arrays::primitive::PrimitiveDataParts; use vortex::array::arrays::struct_::StructDataParts; use vortex::array::arrays::varbinview::VarBinViewDataParts; use vortex::array::buffer::BufferHandle; +use vortex::array::builtins::ArrayBuiltins; use vortex::array::validity::Validity; use vortex::buffer::Buffer; use vortex::buffer::ByteBuffer; +use vortex::dtype::DType; use vortex::dtype::DecimalType; +use vortex::dtype::Nullability; +use vortex::dtype::PType; use vortex::error::VortexResult; use vortex::error::vortex_bail; use vortex::error::vortex_ensure; +use vortex::error::vortex_err; use vortex::extension::datetime::AnyTemporal; use vortex::mask::Mask; +use super::list_view::export_device_list_view; use crate::CudaExecutionCtx; use crate::arrow::ARROW_DEVICE_CUDA; use crate::arrow::ArrowArray; @@ -149,6 +161,25 @@ fn export_canonical( let bits = ctx.ensure_on_device(bits).await?; export_fixed_size(bits, len, offset, validity_buffer, null_count, ctx) } + Canonical::List(listview) => { + // cuDF imports standard Arrow `List`, while Vortex canonical lists are list-views. + // Try the GPU path first; host list-views can fall back to a CPU rebuild. + let is_host = listview.as_ref().is_host(); + let gpu_err = match export_device_list_view(listview.clone(), ctx).await { + Ok(exported) => return Ok(exported), + Err(err) => err, + }; + + // CPU rebuild requires host-resident buffers; device-resident arrays keep the GPU error. + if !is_host { + return Err(gpu_err); + } + + export_list(list_from_list_view(listview)?, ctx).await + } + Canonical::FixedSizeList(fixed_size_list) => { + export_fixed_size_list(fixed_size_list, ctx).await + } Canonical::VarBinView(varbinview) => { let len = varbinview.len(); let VarBinViewDataParts { @@ -209,7 +240,7 @@ fn export_canonical( /// Export Vortex validity as an Arrow validity byte buffer. /// /// Returns `None` for the buffer when Arrow can omit validity because all rows are valid. -async fn export_arrow_validity_buffer( +pub(super) async fn export_arrow_validity_buffer( validity: Validity, len: usize, arrow_offset: usize, @@ -232,6 +263,140 @@ async fn export_arrow_validity_buffer( Ok((Some(validity), null_count)) } +/// Export a standard Vortex list as Arrow `List`: validity, offsets, and one child array. +async fn export_list( + array: ListArray, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrowArray, SyncEvent)> { + let len = array.len(); + let ListDataParts { + elements, + offsets, + validity, + .. + } = array.into_data_parts(); + + let (validity_buffer, null_count) = export_arrow_validity_buffer(validity, len, 0, ctx).await?; + let offsets_buffer = export_arrow_list_offsets(offsets, ctx).await?; + + export_list_layout( + elements, + len, + validity_buffer, + null_count, + offsets_buffer, + ctx, + ) + .await +} + +/// Build the shared Arrow `List` parent once offsets and validity are ready on device. +pub(super) async fn export_list_layout( + elements: ArrayRef, + len: usize, + validity_buffer: Option, + null_count: i64, + offsets_buffer: BufferHandle, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrowArray, SyncEvent)> { + let cuda_elements = elements.execute_cuda(ctx).await?; + let (elements_child, _) = export_canonical(cuda_elements, ctx).await?; + + let mut private_data = PrivateData::new( + vec![validity_buffer, Some(offsets_buffer)], + vec![elements_child], + ctx, + )?; + let sync_event = private_data.sync_event(); + + let mut arrow_list = ArrowArray::empty(); + arrow_list.length = len as i64; + arrow_list.null_count = null_count; + arrow_list.n_buffers = 2; + arrow_list.buffers = private_data.buffer_ptrs.as_mut_ptr(); + arrow_list.n_children = 1; + arrow_list.children = private_data.children.as_mut_ptr(); + arrow_list.release = Some(release_array); + arrow_list.private_data = Box::into_raw(private_data).cast(); + + Ok((arrow_list, sync_event)) +} + +/// Export a Vortex fixed-size-list as Arrow `List`. +/// +/// cuDF's Arrow Device import accepts `List`/`LargeList` as cuDF `LIST`, but rejects +/// `FixedSizeList`, so emit equivalent standard Arrow `List` offsets. +async fn export_fixed_size_list( + array: FixedSizeListArray, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrowArray, SyncEvent)> { + let len = array.len(); + let list_size = array.list_size(); + let FixedSizeListDataParts { + elements, validity, .. + } = array.into_data_parts(); + + let (validity_buffer, null_count) = export_arrow_validity_buffer(validity, len, 0, ctx).await?; + let offsets_buffer = fixed_size_list_offsets(len, list_size, ctx).await?; + + export_list_layout( + elements, + len, + validity_buffer, + null_count, + offsets_buffer, + ctx, + ) + .await +} + +async fn fixed_size_list_offsets( + len: usize, + list_size: u32, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let list_size = i32::try_from(list_size).map_err(|_| { + vortex_err!( + "cannot export FixedSizeList with list size {list_size}: Arrow List offsets require i32" + ) + })?; + let offsets = (0..=i32::try_from(len)?) + .map(|idx| { + idx.checked_mul(list_size) + .ok_or_else(|| vortex_err!("FixedSizeList Arrow List offsets exceed i32 range")) + }) + .collect::>>()?; + + ctx.ensure_on_device(BufferHandle::new_host( + Buffer::from(offsets).into_byte_buffer(), + )) + .await +} + +/// Return cuDF-supported Arrow `List` offsets as an `i32` device buffer. +async fn export_arrow_list_offsets( + offsets: ArrayRef, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let offsets = if offsets.dtype().as_ptype() == PType::I32 { + offsets + } else { + offsets.cast(DType::Primitive(PType::I32, Nullability::NonNullable))? + }; + let offsets = offsets.execute_cuda(ctx).await?; + let Canonical::Primitive(offsets) = offsets else { + vortex_bail!("list offsets must be primitive, got {}", offsets.dtype()); + }; + + let PrimitiveDataParts { ptype, buffer, .. } = offsets.into_data_parts(); + vortex_ensure!( + ptype == PType::I32, + "list offsets cast to i32 produced {ptype}" + ); + + ctx.ensure_on_device(buffer).await +} + async fn export_struct( array: StructArray, ctx: &mut CudaExecutionCtx, @@ -357,11 +522,15 @@ mod tests { use vortex::array::IntoArray; use vortex::array::arrays::BoolArray; use vortex::array::arrays::DecimalArray; + use vortex::array::arrays::FixedSizeListArray; + use vortex::array::arrays::ListArray; + use vortex::array::arrays::ListViewArray; use vortex::array::arrays::NullArray; use vortex::array::arrays::PrimitiveArray; use vortex::array::arrays::StructArray; use vortex::array::arrays::TemporalArray; use vortex::array::arrays::VarBinViewArray; + use vortex::array::arrays::primitive::PrimitiveArrayExt; use vortex::array::arrays::varbinview::BinaryView; use vortex::array::validity::Validity; use vortex::buffer::Buffer; @@ -369,10 +538,13 @@ mod tests { use vortex::dtype::DType; use vortex::dtype::DecimalDType; use vortex::dtype::FieldNames; + use vortex::dtype::NativePType; use vortex::dtype::Nullability; + use vortex::dtype::PType; use vortex::dtype::half::f16; use vortex::error::VortexExpect; use vortex::error::VortexResult; + use vortex::error::vortex_bail; use vortex::extension::datetime::TimeUnit; use vortex::session::VortexSession; @@ -516,6 +688,74 @@ mod tests { (array, buffer_lengths) } + async fn primitive_on_device( + values: impl IntoIterator, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + let primitive = PrimitiveArray::from_iter(values); + let handle = ctx + .ensure_on_device(primitive.buffer_handle().clone()) + .await?; + Ok( + PrimitiveArray::from_buffer_handle(handle, T::PTYPE, Validity::NonNullable) + .into_array(), + ) + } + + async fn primitive_i32_on_device( + values: impl IntoIterator, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + primitive_on_device(values, ctx).await + } + + #[expect(clippy::cast_possible_truncation)] + async fn integer_array_on_device( + ptype: PType, + values: &[i64], + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + match ptype { + PType::U8 => primitive_on_device(values.iter().map(|&value| value as u8), ctx).await, + PType::U16 => primitive_on_device(values.iter().map(|&value| value as u16), ctx).await, + PType::U32 => primitive_on_device(values.iter().map(|&value| value as u32), ctx).await, + PType::U64 => primitive_on_device(values.iter().map(|&value| value as u64), ctx).await, + PType::I8 => primitive_on_device(values.iter().map(|&value| value as i8), ctx).await, + PType::I16 => primitive_on_device(values.iter().map(|&value| value as i16), ctx).await, + PType::I32 => primitive_on_device(values.iter().map(|&value| value as i32), ctx).await, + PType::I64 => primitive_on_device(values.iter().copied(), ctx).await, + ptype => vortex_bail!("test helper only supports integer PTypes, got {ptype}"), + } + } + + async fn nullable_primitive_i32_on_device( + values: impl IntoIterator>, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + let primitive = PrimitiveArray::from_option_iter(values); + let handle = ctx + .ensure_on_device(primitive.buffer_handle().clone()) + .await?; + Ok( + PrimitiveArray::from_buffer_handle(handle, PType::I32, primitive.validity()?) + .into_array(), + ) + } + + fn private_data_buffer_i32_values( + array: &ArrowArray, + buffer_idx: usize, + ) -> VortexResult> { + let private_data = unsafe { &*array.private_data.cast::() }; + let buffer = private_data.buffers[buffer_idx] + .as_ref() + .vortex_expect("buffer should be present"); + Ok(Buffer::::from_byte_buffer(buffer.to_host_sync()) + .iter() + .copied() + .collect()) + } + // Build a nested struct fixture with an out-of-line string-view value. fn nested_struct_array() -> ArrayRef { let nested = StructArray::new( @@ -712,6 +952,154 @@ mod tests { Ok(()) } + #[crate::test] + async fn test_export_list() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let array = ListArray::try_new( + PrimitiveArray::from_iter(0i32..5).into_array(), + PrimitiveArray::from_iter([0i32, 2, 2, 5]).into_array(), + Validity::NonNullable, + )? + .into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + let field = Field::try_from(&exported.schema)?; + assert_eq!( + field, + Field::new_list( + "", + Field::new(Field::LIST_FIELD_DEFAULT_NAME, DataType::Int32, false), + false, + ) + ); + assert_eq!(exported.array.array.length, 3); + assert_eq!(exported.array.array.null_count, 0); + assert_eq!(exported.array.array.n_buffers, 2); + let buffers = unsafe { std::slice::from_raw_parts(exported.array.array.buffers, 2) }; + assert!(buffers[0].is_null()); + assert!(!buffers[1].is_null()); + assert_eq!(exported.array.array.n_children, 1); + let children = unsafe { std::slice::from_raw_parts(exported.array.array.children, 1) }; + let elements = unsafe { &*children[0] }; + assert_eq!(elements.length, 5); + assert_eq!(elements.n_buffers, 2); + assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + + #[crate::test] + async fn test_export_host_contiguous_list_view() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let array = ListViewArray::new( + PrimitiveArray::from_iter(0i32..5).into_array(), + PrimitiveArray::from_iter([0i32, 2, 2]).into_array(), + PrimitiveArray::from_iter([2i32, 0, 3]).into_array(), + Validity::NonNullable, + ) + .into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + assert_eq!(exported.array.array.length, 3); + assert_eq!(exported.array.array.n_buffers, 2); + assert_eq!( + private_data_buffer_i32_values(&exported.array.array, 1)?, + [0, 2, 2, 5] + ); + assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + + #[crate::test] + async fn test_export_host_non_contiguous_nested_list_view_falls_back_to_cpu() -> VortexResult<()> + { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let elements = StructArray::new( + FieldNames::from_iter(["x"]), + vec![PrimitiveArray::from_iter(0i32..4).into_array()], + 4, + Validity::NonNullable, + ) + .into_array(); + let array = ListViewArray::new( + elements, + PrimitiveArray::from_iter([0i32, 1]).into_array(), + PrimitiveArray::from_iter([3i32, 2]).into_array(), + Validity::NonNullable, + ) + .into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + assert_eq!(exported.array.array.length, 2); + assert_eq!( + private_data_buffer_i32_values(&exported.array.array, 1)?, + [0, 3, 5] + ); + let list_children = unsafe { std::slice::from_raw_parts(exported.array.array.children, 1) }; + let struct_child = unsafe { &*list_children[0] }; + assert_eq!(struct_child.length, 5); + let struct_children = unsafe { std::slice::from_raw_parts(struct_child.children, 1) }; + let field_child = unsafe { &*struct_children[0] }; + assert_eq!( + private_data_buffer_i32_values(field_child, 1)?, + [0, 1, 2, 1, 2] + ); + assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + + #[rstest] + #[case::i32_i32(PType::I32, PType::I32)] + #[case::u32_u16(PType::U32, PType::U16)] + #[case::i64_u8(PType::I64, PType::U8)] + #[case::u64_i16(PType::U64, PType::I16)] + #[crate::test] + async fn test_export_device_contiguous_list_view( + #[case] offsets_ptype: PType, + #[case] sizes_ptype: PType, + ) -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let elements = primitive_i32_on_device(0..5, &mut ctx).await?; + let offsets = integer_array_on_device(offsets_ptype, &[0, 2, 2], &mut ctx).await?; + let sizes = integer_array_on_device(sizes_ptype, &[2, 0, 3], &mut ctx).await?; + let array = + ListViewArray::new(elements, offsets, sizes, Validity::NonNullable).into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + let field = Field::try_from(&exported.schema)?; + assert_eq!( + field, + Field::new_list( + "", + Field::new(Field::LIST_FIELD_DEFAULT_NAME, DataType::Int32, false), + false, + ) + ); + assert_eq!(exported.array.array.length, 3); + assert_eq!(exported.array.array.n_buffers, 2); + assert_eq!( + private_data_buffer_i32_values(&exported.array.array, 1)?, + [0, 2, 2, 5] + ); + assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + #[rstest] #[case::utf8( multi_buffer_varbinview(DType::Utf8(Nullability::NonNullable)), @@ -741,6 +1129,213 @@ mod tests { Ok(()) } + #[rstest] + #[case::i64(PrimitiveArray::from_iter([0i64, 2, 2, 5]).into_array())] + #[case::u64(PrimitiveArray::from_iter([0u64, 2, 2, 5]).into_array())] + #[crate::test] + async fn test_export_list_with_non_i32_offsets(#[case] offsets: ArrayRef) -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let array = ListArray::try_new( + PrimitiveArray::from_iter(0i32..5).into_array(), + offsets, + Validity::NonNullable, + )? + .into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + assert_eq!(exported.array.array.length, 3); + assert_eq!(exported.array.array.n_buffers, 2); + assert_eq!( + private_data_buffer_i32_values(&exported.array.array, 1)?, + [0, 2, 2, 5] + ); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + + #[rstest] + #[case::i32_i32(PType::I32, PType::I32)] + #[case::u32_u16(PType::U32, PType::U16)] + #[case::i64_u8(PType::I64, PType::U8)] + #[case::u64_i16(PType::U64, PType::I16)] + #[crate::test] + async fn test_export_device_non_contiguous_primitive_list_view( + #[case] offsets_ptype: PType, + #[case] sizes_ptype: PType, + ) -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let elements = primitive_i32_on_device([10, 11, 12, 13, 14], &mut ctx).await?; + let offsets = integer_array_on_device(offsets_ptype, &[3, 0, 2], &mut ctx).await?; + let sizes = integer_array_on_device(sizes_ptype, &[2, 2, 1], &mut ctx).await?; + let array = + ListViewArray::new(elements, offsets, sizes, Validity::NonNullable).into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + assert_eq!(exported.array.array.length, 3); + assert_eq!( + private_data_buffer_i32_values(&exported.array.array, 1)?, + [0, 2, 4, 5] + ); + let children = unsafe { std::slice::from_raw_parts(exported.array.array.children, 1) }; + let elements = unsafe { &*children[0] }; + assert_eq!( + private_data_buffer_i32_values(elements, 1)?, + [13, 14, 10, 11, 12] + ); + assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + + #[rstest] + #[case::out_of_bounds(&[3], &[2], "offsets/sizes are invalid")] + #[case::negative_offset(&[-1], &[1], "offsets exceed i32 range")] + #[crate::test] + async fn test_export_device_invalid_list_view_returns_error( + #[case] offsets_values: &[i64], + #[case] sizes_values: &[i64], + #[case] expected_error: &str, + ) -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let elements = primitive_i32_on_device(0..4, &mut ctx).await?; + let offsets = integer_array_on_device(PType::I32, offsets_values, &mut ctx).await?; + let sizes = integer_array_on_device(PType::I32, sizes_values, &mut ctx).await?; + let array = unsafe { + ListViewArray::new_unchecked(elements, offsets, sizes, Validity::NonNullable) + } + .into_array(); + let err = match array.export_device_array(&mut ctx).await { + Ok(mut exported) => { + unsafe { release_exported_array(&raw mut exported.array) }; + vortex_bail!("invalid device list view should be unsupported") + } + Err(err) => err, + }; + + assert!( + err.to_string().contains(expected_error), + "unexpected error: {err}" + ); + Ok(()) + } + + #[crate::test] + async fn test_export_device_non_contiguous_nested_list_view_returns_error() -> VortexResult<()> + { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let field = primitive_i32_on_device(0..4, &mut ctx).await?; + let elements = StructArray::new( + FieldNames::from_iter(["x"]), + vec![field], + 4, + Validity::NonNullable, + ) + .into_array(); + let offsets = primitive_i32_on_device([0, 1], &mut ctx).await?; + let sizes = primitive_i32_on_device([3, 2], &mut ctx).await?; + let array = + ListViewArray::new(elements, offsets, sizes, Validity::NonNullable).into_array(); + let err = match array.export_device_array(&mut ctx).await { + Ok(mut exported) => { + unsafe { release_exported_array(&raw mut exported.array) }; + vortex_bail!("non-contiguous nested list view should be unsupported") + } + Err(err) => err, + }; + + assert!( + err.to_string() + .contains("GPU child rebuild only supports primitive children"), + "unexpected error: {err}" + ); + Ok(()) + } + + #[crate::test] + async fn test_export_device_non_contiguous_nullable_primitive_list_view_returns_error() + -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let elements = nullable_primitive_i32_on_device( + [Some(10), None, Some(12), Some(13), Some(14)], + &mut ctx, + ) + .await?; + let offsets = primitive_i32_on_device([3, 0, 2], &mut ctx).await?; + let sizes = primitive_i32_on_device([2, 2, 1], &mut ctx).await?; + let array = + ListViewArray::new(elements, offsets, sizes, Validity::NonNullable).into_array(); + let err = match array.export_device_array(&mut ctx).await { + Ok(mut exported) => { + unsafe { release_exported_array(&raw mut exported.array) }; + vortex_bail!("non-contiguous nullable primitive list view should be unsupported") + } + Err(err) => err, + }; + + assert!( + err.to_string() + .contains("GPU child validity rebuild is not implemented"), + "unexpected error: {err}" + ); + Ok(()) + } + + #[crate::test] + async fn test_export_fixed_size_list_as_list() -> VortexResult<()> { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let array = FixedSizeListArray::new( + PrimitiveArray::from_iter(0i32..6).into_array(), + 2, + Validity::NonNullable, + 3, + ) + .into_array(); + let mut exported = array.export_device_array_with_schema(&mut ctx).await?; + + let field = Field::try_from(&exported.schema)?; + assert_eq!( + field, + Field::new_list( + "", + Field::new(Field::LIST_FIELD_DEFAULT_NAME, DataType::Int32, false), + false, + ) + ); + assert_eq!(exported.array.array.length, 3); + assert_eq!(exported.array.array.null_count, 0); + assert_eq!(exported.array.array.n_buffers, 2); + let buffers = unsafe { std::slice::from_raw_parts(exported.array.array.buffers, 2) }; + assert!(buffers[0].is_null()); + assert!(!buffers[1].is_null()); + assert_eq!( + private_data_buffer_i32_values(&exported.array.array, 1)?, + [0, 2, 4, 6] + ); + assert_eq!(exported.array.array.n_children, 1); + let children = unsafe { std::slice::from_raw_parts(exported.array.array.children, 1) }; + let elements = unsafe { &*children[0] }; + assert_eq!(elements.length, 6); + assert_eq!(elements.n_buffers, 2); + assert_eq!(exported.array.device_type, ARROW_DEVICE_CUDA); + + unsafe { release_exported_array(&raw mut exported.array.array) }; + Ok(()) + } + // Check device metadata for data-bearing and metadata-only exports. #[crate::test] async fn test_export_device_metadata() -> VortexResult<()> { diff --git a/vortex-cuda/src/arrow/list_view.rs b/vortex-cuda/src/arrow/list_view.rs new file mode 100644 index 00000000000..f8a0be2f047 --- /dev/null +++ b/vortex-cuda/src/arrow/list_view.rs @@ -0,0 +1,444 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! CUDA Arrow Device export helpers for Vortex `ListViewArray`. + +use std::sync::Arc; + +use cudarc::driver::CudaSlice; +use cudarc::driver::DeviceRepr; +use cudarc::driver::PushKernelArg; +use vortex::array::ArrayRef; +use vortex::array::Canonical; +use vortex::array::IntoArray; +use vortex::array::arrays::ListViewArray; +use vortex::array::arrays::PrimitiveArray; +use vortex::array::arrays::listview::ListViewDataParts; +use vortex::array::arrays::primitive::PrimitiveDataParts; +use vortex::array::buffer::BufferHandle; +use vortex::array::match_each_integer_ptype; +use vortex::buffer::Buffer; +use vortex::dtype::NativePType; +use vortex::dtype::PType; +use vortex::error::VortexResult; +use vortex::error::vortex_bail; +use vortex::error::vortex_ensure; +use vortex::error::vortex_err; + +use super::ArrowArray; +use super::SyncEvent; +use super::canonical::export_arrow_validity_buffer; +use super::canonical::export_list_layout; +use crate::CudaBufferExt; +use crate::CudaDeviceBuffer; +use crate::CudaExecutionCtx; +use crate::cub::exclusive_sum_i32; +use crate::executor::CudaArrayExt; + +/// Export a Vortex list-view as Arrow `List` using device kernels. +/// +/// Contiguous list-views reuse their child elements. Non-contiguous list-views are rebuilt on GPU +/// only when the child is primitive and non-nullable/non-null; other child shapes are rejected. +pub(super) async fn export_device_list_view( + array: ListViewArray, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrowArray, SyncEvent)> { + let len = array.len(); + let ListViewDataParts { + elements, + offsets, + sizes, + validity, + .. + } = array.into_data_parts(); + + let (validity_buffer, null_count) = export_arrow_validity_buffer(validity, len, 0, ctx).await?; + + let (offsets_ptype, offsets_buffer) = + primitive_device_buffer(offsets, "list offsets", ctx).await?; + let (sizes_ptype, sizes_buffer) = primitive_device_buffer(sizes, "list sizes", ctx).await?; + + match export_device_list_view_offsets( + offsets_ptype, + offsets_buffer.clone(), + sizes_ptype, + sizes_buffer.clone(), + len, + ctx, + ) + .await? + { + DeviceListViewOffsets::Contiguous(offsets_buffer) => { + export_list_layout( + elements, + len, + validity_buffer, + null_count, + offsets_buffer, + ctx, + ) + .await + } + DeviceListViewOffsets::RequiresRebuild => { + export_rebuilt_primitive_list_view( + elements, + offsets_ptype, + offsets_buffer, + sizes_ptype, + sizes_buffer, + len, + validity_buffer, + null_count, + ctx, + ) + .await + } + } +} + +enum DeviceListViewOffsets { + Contiguous(BufferHandle), + RequiresRebuild, +} + +/// Build cuDF-supported `i32` Arrow `List` offsets from list-view offset/size device buffers. +#[expect(clippy::cognitive_complexity)] +async fn export_device_list_view_offsets( + offsets_ptype: PType, + offsets_buffer: BufferHandle, + sizes_ptype: PType, + sizes_buffer: BufferHandle, + len: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + if len == 0 { + let offsets = ctx + .ensure_on_device(BufferHandle::new_host( + Buffer::from(vec![0i32]).into_byte_buffer(), + )) + .await?; + return Ok(DeviceListViewOffsets::Contiguous(offsets)); + } + + match_each_integer_ptype!(offsets_ptype, |O| { + match_each_integer_ptype!(sizes_ptype, |S| { + export_device_list_view_offsets_typed::(offsets_buffer, sizes_buffer, len, ctx) + .await + }) + }) +} + +async fn rebuild_primitive_list_view_typed( + offsets: BufferHandle, + sizes: BufferHandle, + values: BufferHandle, + elements_len: usize, + list_len: usize, + values_ptype: PType, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(BufferHandle, BufferHandle)> +where + O: NativePType + DeviceRepr + Send + Sync + 'static, + S: NativePType + DeviceRepr + Send + Sync + 'static, +{ + let status = new_list_view_status(ctx).await?; + let scan_len = list_len + 1; + + let scan_input = init_list_view_rebuild_scan::(&sizes, &status, list_len, ctx)?; + let output_offsets = BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new( + exclusive_sum_i32(&scan_input, scan_len, ctx)?, + ))); + + validate_list_view_rebuild_offsets::(&sizes, &output_offsets, &status, list_len, ctx)?; + check_list_view_rebuild_status(&status).await?; + + let total_values = total_values_from_offsets(&output_offsets, list_len).await?; + let value_width = values_ptype.byte_width(); + let output_values_bytes = total_values + .checked_mul(value_width) + .ok_or_else(|| vortex_err!("rebuilt list child byte length overflow"))?; + + let output_values = gather_rebuilt_primitive_values::( + &offsets, + &sizes, + &values, + &output_offsets, + output_values_bytes, + elements_len, + list_len, + value_width, + &status, + ctx, + )?; + check_list_view_rebuild_status(&status).await?; + + let values = BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(output_values))) + .slice(0..output_values_bytes); + Ok((output_offsets, values)) +} + +async fn new_list_view_status(ctx: &mut CudaExecutionCtx) -> VortexResult { + ctx.ensure_on_device(BufferHandle::new_host( + Buffer::from(vec![0u32]).into_byte_buffer(), + )) + .await +} + +async fn check_list_view_rebuild_status(status: &BufferHandle) -> VortexResult<()> { + match Buffer::::from_byte_buffer(status.try_to_host()?.await?)[0] { + 0 => Ok(()), + 1 => vortex_bail!( + "cannot export device-resident ListViewArray as Arrow List: offsets/sizes are invalid for the child elements" + ), + 2 => vortex_bail!( + "cannot export device-resident ListViewArray as Arrow List: offsets exceed i32 range required by cuDF" + ), + status => vortex_bail!("unexpected list-view rebuild status {status}"), + } +} + +fn init_list_view_rebuild_scan( + sizes: &BufferHandle, + status: &BufferHandle, + list_len: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult> +where + S: NativePType + DeviceRepr + Send + Sync + 'static, +{ + let scan_len = list_len + 1; + let sizes_view = sizes.cuda_view::()?; + let status_view = status.cuda_view::()?; + let list_len_u64 = list_len as u64; + let scan_len_u64 = scan_len as u64; + let scan_input = ctx.device_alloc::(scan_len)?; + let init_kernel = ctx + .load_function_with_suffixes("list_view", &["rebuild_init_scan", &S::PTYPE.to_string()])?; + + ctx.launch_kernel(&init_kernel, scan_len, |args| { + args.arg(&sizes_view) + .arg(&scan_input) + .arg(&status_view) + .arg(&list_len_u64) + .arg(&scan_len_u64); + })?; + + Ok(scan_input) +} + +fn validate_list_view_rebuild_offsets( + sizes: &BufferHandle, + output_offsets: &BufferHandle, + status: &BufferHandle, + list_len: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<()> +where + S: NativePType + DeviceRepr + Send + Sync + 'static, +{ + let sizes_view = sizes.cuda_view::()?; + let output_offsets_view = output_offsets.cuda_view::()?; + let status_view = status.cuda_view::()?; + let list_len_u64 = list_len as u64; + let validate_kernel = ctx.load_function_with_suffixes( + "list_view", + &["rebuild_validate_offsets", &S::PTYPE.to_string()], + )?; + + ctx.launch_kernel(&validate_kernel, list_len, |args| { + args.arg(&sizes_view) + .arg(&output_offsets_view) + .arg(&status_view) + .arg(&list_len_u64); + }) +} + +async fn total_values_from_offsets( + output_offsets: &BufferHandle, + list_len: usize, +) -> VortexResult { + let total_values = Buffer::::from_byte_buffer( + output_offsets + .slice_typed::(list_len..list_len + 1) + .try_to_host()? + .await?, + )[0]; + + usize::try_from(total_values).map_err(Into::into) +} + +#[expect(clippy::too_many_arguments)] +fn gather_rebuilt_primitive_values( + offsets: &BufferHandle, + sizes: &BufferHandle, + values: &BufferHandle, + output_offsets: &BufferHandle, + output_values_bytes: usize, + elements_len: usize, + list_len: usize, + value_width: usize, + status: &BufferHandle, + ctx: &mut CudaExecutionCtx, +) -> VortexResult> +where + O: NativePType + DeviceRepr + Send + Sync + 'static, + S: NativePType + DeviceRepr + Send + Sync + 'static, +{ + let offsets_view = offsets.cuda_view::()?; + let sizes_view = sizes.cuda_view::()?; + let values_view = values.cuda_view::()?; + let output_offsets_view = output_offsets.cuda_view::()?; + let output_values = ctx.device_alloc::(output_values_bytes.max(1))?; + let status_view = status.cuda_view::()?; + let list_len_u64 = list_len as u64; + let elements_len_u64 = elements_len as u64; + let value_width_u64 = value_width as u64; + let rebuild_kernel = ctx.load_function_with_suffixes( + "list_view", + &[ + "rebuild_primitive", + &O::PTYPE.to_string(), + &S::PTYPE.to_string(), + ], + )?; + + ctx.launch_kernel(&rebuild_kernel, list_len, |args| { + args.arg(&offsets_view) + .arg(&sizes_view) + .arg(&output_offsets_view) + .arg(&values_view) + .arg(&output_values) + .arg(&status_view) + .arg(&list_len_u64) + .arg(&elements_len_u64) + .arg(&value_width_u64); + })?; + + Ok(output_values) +} + +#[expect(clippy::cognitive_complexity, clippy::too_many_arguments)] +async fn export_rebuilt_primitive_list_view( + elements: ArrayRef, + offsets_ptype: PType, + offsets_buffer: BufferHandle, + sizes_ptype: PType, + sizes_buffer: BufferHandle, + len: usize, + validity_buffer: Option, + null_count: i64, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(ArrowArray, SyncEvent)> { + let canonical_elements = elements.execute_cuda(ctx).await?; + let Canonical::Primitive(elements) = canonical_elements else { + vortex_bail!( + "cannot export non-contiguous device-resident ListViewArray with {} child: GPU child rebuild only supports primitive children", + canonical_elements.dtype() + ); + }; + let elements_len = elements.len(); + let PrimitiveDataParts { + ptype, + buffer, + validity, + .. + } = elements.into_data_parts(); + + vortex_ensure!( + validity.no_nulls(), + "cannot export non-contiguous device-resident ListViewArray with nullable primitive child: GPU child validity rebuild is not implemented" + ); + + let values_buffer = ctx.ensure_on_device(buffer).await?; + let (offsets_buffer, values_buffer) = match_each_integer_ptype!(offsets_ptype, |O| { + match_each_integer_ptype!(sizes_ptype, |S| { + rebuild_primitive_list_view_typed::( + offsets_buffer, + sizes_buffer, + values_buffer, + elements_len, + len, + ptype, + ctx, + ) + .await + }) + })?; + + let rebuilt_elements = + PrimitiveArray::from_buffer_handle(values_buffer, ptype, validity).into_array(); + + export_list_layout( + rebuilt_elements, + len, + validity_buffer, + null_count, + offsets_buffer, + ctx, + ) + .await +} + +async fn primitive_device_buffer( + array: ArrayRef, + name: &str, + ctx: &mut CudaExecutionCtx, +) -> VortexResult<(PType, BufferHandle)> { + let canonical = array.execute_cuda(ctx).await?; + let Canonical::Primitive(primitive) = canonical else { + vortex_bail!("{name} must be primitive, got {}", canonical.dtype()); + }; + + let PrimitiveDataParts { ptype, buffer, .. } = primitive.into_data_parts(); + vortex_ensure!(ptype.is_int(), "{name} must have integer type, got {ptype}"); + + Ok((ptype, ctx.ensure_on_device(buffer).await?)) +} + +async fn export_device_list_view_offsets_typed( + offsets: BufferHandle, + sizes: BufferHandle, + len: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult +where + O: NativePType + DeviceRepr + Send + Sync + 'static, + S: NativePType + DeviceRepr + Send + Sync + 'static, +{ + let output_len = len + 1; + let output = ctx.device_alloc::(output_len)?; + + let status = ctx + .ensure_on_device(BufferHandle::new_host( + Buffer::from(vec![0u32]).into_byte_buffer(), + )) + .await?; + + let offsets_view = offsets.cuda_view::()?; + let sizes_view = sizes.cuda_view::()?; + let status_view = status.cuda_view::()?; + let list_len_u64 = len as u64; + + let kernel = ctx.load_function_with_suffixes( + "list_view", + &["offsets", &O::PTYPE.to_string(), &S::PTYPE.to_string()], + )?; + ctx.launch_kernel(&kernel, len, |args| { + args.arg(&offsets_view) + .arg(&sizes_view) + .arg(&output) + .arg(&status_view) + .arg(&list_len_u64); + })?; + + match Buffer::::from_byte_buffer(status.try_to_host()?.await?)[0] { + 0 => Ok(DeviceListViewOffsets::Contiguous(BufferHandle::new_device( + Arc::new(CudaDeviceBuffer::new(output)), + ))), + 1 => Ok(DeviceListViewOffsets::RequiresRebuild), + 2 => vortex_bail!( + "cannot export device-resident ListViewArray as Arrow List: offsets exceed i32 range required by cuDF" + ), + status => vortex_bail!("unexpected list-view offsets kernel status {status}"), + } +} diff --git a/vortex-cuda/src/arrow/mod.rs b/vortex-cuda/src/arrow/mod.rs index 3739a12e6ec..e21383d96e7 100644 --- a/vortex-cuda/src/arrow/mod.rs +++ b/vortex-cuda/src/arrow/mod.rs @@ -9,6 +9,7 @@ //! More documentation at mod canonical; +mod list_view; use std::ffi::c_void; use std::fmt::Debug; @@ -25,6 +26,7 @@ use vortex::array::ArrayRef; use vortex::array::arrow::ArrowSessionExt; use vortex::array::buffer::BufferHandle; use vortex::dtype::DType; +use vortex::dtype::StructFields; use vortex::error::VortexResult; use vortex::error::vortex_err; @@ -208,16 +210,37 @@ fn arrow_schema_for_array( ctx: &mut CudaExecutionCtx, ) -> VortexResult { let arrow = ctx.execution_ctx().session().arrow(); - match array.dtype() { - DType::Struct(..) => Ok(FFI_ArrowSchema::try_from( - arrow.to_arrow_schema(array.dtype())?, - )?), + let dtype = arrow_device_export_dtype(array.dtype()); + match &dtype { + DType::Struct(..) => Ok(FFI_ArrowSchema::try_from(arrow.to_arrow_schema(&dtype)?)?), _ => Ok(FFI_ArrowSchema::try_from( - arrow.to_arrow_field("", array.dtype())?, + arrow.to_arrow_field("", &dtype)?, )?), } } +fn arrow_device_export_dtype(dtype: &DType) -> DType { + match dtype { + DType::List(element, nullability) => { + DType::List(Arc::new(arrow_device_export_dtype(element)), *nullability) + } + DType::FixedSizeList(element, _, nullability) => { + DType::List(Arc::new(arrow_device_export_dtype(element)), *nullability) + } + DType::Struct(fields, nullability) => DType::Struct( + StructFields::new( + fields.names().clone(), + fields + .fields() + .map(|dtype| arrow_device_export_dtype(&dtype)) + .collect(), + ), + *nullability, + ), + dtype => dtype.clone(), + } +} + /// A type that can convert a Vortex array into an [`ArrowDeviceArray`]. #[async_trait] pub trait ExportDeviceArray: Debug + Send + Sync + 'static { diff --git a/vortex-cuda/src/cub.rs b/vortex-cuda/src/cub.rs new file mode 100644 index 00000000000..4b6009e7cea --- /dev/null +++ b/vortex-cuda/src/cub.rs @@ -0,0 +1,49 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! CUDA wrappers around CUB scan primitives. + +use std::ffi::c_void; + +use cudarc::driver::CudaSlice; +use cudarc::driver::DevicePtr; +use cudarc::driver::DevicePtrMut; +use vortex::error::VortexResult; +use vortex::error::vortex_err; +use vortex_cub::scan; +use vortex_cub::scan::cudaStream_t; + +use crate::CudaExecutionCtx; + +pub(crate) fn exclusive_sum_i32( + input: &CudaSlice, + len: usize, + ctx: &mut CudaExecutionCtx, +) -> VortexResult> { + let len_i64 = i64::try_from(len)?; + let temp_bytes = scan::exclusive_sum_i32_temp_size(len_i64) + .map_err(|err| vortex_err!("CUB scan_exclusive_sum_i32_temp_size failed: {err}"))?; + + let mut temp = ctx.device_alloc::(temp_bytes.max(1))?; + let mut output = ctx.device_alloc::(len)?; + let stream = ctx.stream(); + let stream_ptr = stream.cu_stream() as cudaStream_t; + let (input_ptr, record_input) = input.device_ptr(stream); + let (output_ptr, record_output) = output.device_ptr_mut(stream); + let (temp_ptr, record_temp) = temp.device_ptr_mut(stream); + + ctx.launch_external(len, || unsafe { + scan::exclusive_sum_i32( + temp_ptr as *mut c_void, + temp_bytes, + input_ptr as *const i32, + output_ptr as *mut i32, + len_i64, + stream_ptr, + ) + .map_err(|err| vortex_err!("CUB scan_exclusive_sum_i32 failed: {err}")) + })?; + drop((record_input, record_output, record_temp)); + + Ok(output) +} diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index d2227579d9f..9e9de33a288 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -9,6 +9,7 @@ use tracing::info; pub mod arrow; mod canonical; +mod cub; mod device_buffer; mod device_read_at; pub mod dynamic_dispatch; diff --git a/vortex-test/e2e-cuda/src/lib.rs b/vortex-test/e2e-cuda/src/lib.rs index 0a8ca91f021..11a540ff70e 100644 --- a/vortex-test/e2e-cuda/src/lib.rs +++ b/vortex-test/e2e-cuda/src/lib.rs @@ -1,20 +1,13 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors -//! This file is a simple C-compatible API that is called from the cudf-test-harness at CI time. -//! -//! The flow is: -//! -//! * test harness calls `dlopen` in this library -//! * invokes the `export_array` function to get back the device array -//! * pass the arrays to `cudf`'s `from_arrow_device_column` -//! * run some operations on the loaded column view -//! * call `array->release()` to drop the data allocated from the Rust side - -#![expect(clippy::unwrap_used, clippy::expect_used)] +//! C ABI used by `cudf-test-harness` to export and validate Arrow Device data in CI. + +#![expect(clippy::expect_used)] use std::env; use std::mem; +use std::panic; use std::sync::Arc; use std::sync::LazyLock; @@ -27,6 +20,7 @@ use arrow_array::cast::AsArray; use arrow_array::ffi::FFI_ArrowArray; use arrow_array::ffi::from_ffi; use arrow_array::make_array; +use arrow_schema::DataType; use arrow_schema::Field; use arrow_schema::Fields; use arrow_schema::ffi::FFI_ArrowSchema; @@ -35,6 +29,8 @@ use vortex::array::ArrayRef as VortexArrayRef; use vortex::array::IntoArray; use vortex::array::VortexSessionExecute; use vortex::array::arrays::DecimalArray; +use vortex::array::arrays::FixedSizeListArray; +use vortex::array::arrays::ListArray; use vortex::array::arrays::PrimitiveArray; use vortex::array::arrays::StructArray; use vortex::array::arrays::TemporalArray; @@ -94,14 +90,54 @@ fn primitive_array() -> Result { }) } +fn list_array() -> VortexArrayRef { + ListArray::try_new( + PrimitiveArray::from_iter([10i32, 11, 12, 13, 14]).into_array(), + PrimitiveArray::from_iter([0i32, 2, 2, 5, 5, 5]).into_array(), + Validity::from_iter([true, false, true, true, false]), + ) + .expect("list array") + .into_array() +} + +fn fixed_size_list_array() -> VortexArrayRef { + FixedSizeListArray::new( + PrimitiveArray::from_iter(20i32..30).into_array(), + 2, + Validity::from_iter([true, false, true, true, false]), + 5, + ) + .into_array() +} + +fn fixed_size_list_as_list_array() -> VortexArrayRef { + ListArray::try_new( + PrimitiveArray::from_iter(20i32..30).into_array(), + PrimitiveArray::from_iter([0i32, 2, 4, 6, 8, 10]).into_array(), + Validity::from_iter([true, false, true, true, false]), + ) + .expect("fixed-size-list as list array") + .into_array() +} + /// # Safety -/// called by C++ code. +/// `schema_ptr` and `array_ptr` must be valid writable pointers. #[unsafe(no_mangle)] pub unsafe extern "C" fn export_array( schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDeviceArray, ) -> i32 { - let mut ctx = CudaSession::create_execution_ctx(&SESSION).unwrap(); + ffi_boundary("export_array", || export_array_inner(schema_ptr, array_ptr)) +} + +fn export_array_inner(schema_ptr: &mut FFI_ArrowSchema, array_ptr: &mut ArrowDeviceArray) -> i32 { + let mut ctx = match CudaSession::create_execution_ctx(&SESSION) { + Ok(ctx) => ctx, + Err(err) => { + eprintln!("error creating CUDA execution context: {err}"); + return 1; + } + }; let primitive = match primitive_array() { Ok(array) => array, @@ -128,12 +164,21 @@ pub unsafe extern "C" fn export_array( ); let array = StructArray::new( - FieldNames::from_iter(["prims", "decimals", "strings", "dates"]), + FieldNames::from_iter([ + "prims", + "decimals", + "strings", + "dates", + "lists", + "fixed_lists", + ]), vec![ primitive, decimal.into_array(), strings.into_array(), dates.into_array(), + list_array(), + fixed_size_list_array(), ], 5, Validity::NonNullable, @@ -154,16 +199,38 @@ pub unsafe extern "C" fn export_array( } /// # Safety -/// called by C++ code. +/// `ffi_schema` and `ffi_array` must describe a valid Arrow C Data array. #[unsafe(no_mangle)] pub unsafe extern "C" fn validate_array( ffi_schema: &FFI_ArrowSchema, ffi_array: &mut FFI_ArrowArray, ) -> i32 { - // SAFETY: the provided pointers must not be null, and must point at valid FFI Arrow types. + ffi_boundary("validate_array", || { + validate_array_inner(ffi_schema, ffi_array) + }) +} + +fn ffi_boundary(name: &str, f: impl FnOnce() -> i32) -> i32 { + match panic::catch_unwind(panic::AssertUnwindSafe(f)) { + Ok(code) => code, + Err(_) => { + eprintln!("panic in {name}"); + 1 + } + } +} + +fn validate_array_inner(ffi_schema: &FFI_ArrowSchema, ffi_array: &mut FFI_ArrowArray) -> i32 { + // SAFETY: guaranteed by the C ABI contract. let array_data = unsafe { let ffi_array = mem::replace(ffi_array, FFI_ArrowArray::empty()); - from_ffi(ffi_array, ffi_schema).expect("from_ffi failed") + match from_ffi(ffi_array, ffi_schema) { + Ok(array_data) => array_data, + Err(err) => { + eprintln!("from_ffi failed: {err}"); + return 1; + } + } }; let array = make_array(array_data); @@ -188,31 +255,72 @@ pub unsafe extern "C" fn validate_array( None, ]); let date = Date32Array::from(vec![Some(100i32), None, Some(300), Some(400), None]); + let list = SESSION + .arrow() + .execute_arrow(list_array(), None, &mut SESSION.create_execution_ctx()) + .expect("expected list Arrow array"); + let fixed_size_list = SESSION + .arrow() + .execute_arrow( + fixed_size_list_as_list_array(), + None, + &mut SESSION.create_execution_ctx(), + ) + .expect("expected fixed-size-list-as-list Arrow array"); let expected_fields = Fields::from_iter([ Field::new("prims", primitive.data_type().clone(), true), Field::new("decimals", decimal.data_type().clone(), true), Field::new("strings", string.data_type().clone(), true), Field::new("dates", date.data_type().clone(), true), + cudf_list_field("lists"), + cudf_list_field("fixed_lists"), ]); + if &expected_fields != struct_array.fields() { + eprintln!("wrong fields for host array"); + return 1; + } - assert_eq!( - &expected_fields, - struct_array.fields(), - "wrong fields for host array: {:?}", - struct_array.fields() - ); - - let expected_fields: [ArrowArrayRef; _] = [ + let expected_arrays: [ArrowArrayRef; 4] = [ primitive, Arc::new(decimal), Arc::new(string), Arc::new(date), ]; - for (expected, actual) in expected_fields.iter().zip(struct_array.columns()) { - assert_eq!(expected.as_ref(), actual.as_ref()); + for (idx, (expected, actual)) in expected_arrays + .iter() + .zip(struct_array.columns()) + .enumerate() + { + if expected.as_ref() != actual.as_ref() { + eprintln!("wrong values for host column {idx}"); + return 1; + } + } + + if !list_values_eq(list.as_ref(), struct_array.column(4).as_ref()) { + eprintln!("wrong values for lists column"); + return 1; + } + if !list_values_eq(fixed_size_list.as_ref(), struct_array.column(5).as_ref()) { + eprintln!("wrong values for fixed_lists column"); + return 1; } 0 } + +fn cudf_list_field(name: &str) -> Field { + Field::new_list(name, Field::new("element", DataType::Int32, false), true) +} + +fn list_values_eq(expected: &dyn Array, actual: &dyn Array) -> bool { + let expected = expected.as_list::(); + let actual = actual.as_list::(); + + expected.len() == actual.len() + && expected.value_offsets() == actual.value_offsets() + && (0..expected.len()).all(|idx| expected.is_null(idx) == actual.is_null(idx)) + && expected.values().as_ref() == actual.values().as_ref() +}