Skip to content

Commit 7ceec68

Browse files
committed
feat: arrow device array list view export
Signed-off-by: Alexander Droste <alexander.droste@protonmail.com>
1 parent 35212ef commit 7ceec68

14 files changed

Lines changed: 1596 additions & 11 deletions

File tree

vortex-cuda/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,7 @@ harness = false
104104
[[bench]]
105105
name = "fsst_cuda"
106106
harness = false
107+
108+
[[bench]]
109+
name = "list_view_cuda"
110+
harness = false
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! CUDA benchmarks for Arrow Device export of Vortex list-view arrays.
5+
6+
#![expect(clippy::cast_possible_truncation)]
7+
#![expect(clippy::unwrap_used)]
8+
9+
#[allow(dead_code)]
10+
mod bench_config;
11+
mod timed_launch_strategy;
12+
13+
use std::sync::Arc;
14+
use std::sync::atomic::Ordering;
15+
use std::time::Duration;
16+
17+
use criterion::BenchmarkId;
18+
use criterion::Criterion;
19+
use criterion::Throughput;
20+
use futures::executor::block_on;
21+
use vortex::array::ArrayRef;
22+
use vortex::array::IntoArray;
23+
use vortex::array::arrays::ListViewArray;
24+
use vortex::array::arrays::PrimitiveArray;
25+
use vortex::array::validity::Validity;
26+
use vortex::dtype::PType;
27+
use vortex::error::VortexExpect;
28+
use vortex::error::VortexResult;
29+
use vortex::session::VortexSession;
30+
use vortex_cuda::CudaExecutionCtx;
31+
use vortex_cuda::CudaSession;
32+
use vortex_cuda::arrow::ArrowDeviceArray;
33+
use vortex_cuda::arrow::DeviceArrayExt;
34+
use vortex_cuda_macros::cuda_available;
35+
use vortex_cuda_macros::cuda_not_available;
36+
37+
use crate::timed_launch_strategy::TimedLaunchStrategy;
38+
39+
const LIST_VIEW_CONTIGUOUS_BENCH_SIZES: &[(usize, &str)] = &[(10_000_000, "10M")];
40+
const LIST_VIEW_REBUILD_BENCH_SIZES: &[(usize, &str)] = &[(10_000_000, "10M")];
41+
42+
async fn primitive_i32_on_device(
43+
values: impl IntoIterator<Item = i32>,
44+
ctx: &mut CudaExecutionCtx,
45+
) -> VortexResult<ArrayRef> {
46+
let primitive = PrimitiveArray::from_iter(values);
47+
let handle = ctx
48+
.ensure_on_device(primitive.buffer_handle().clone())
49+
.await?;
50+
Ok(PrimitiveArray::from_buffer_handle(handle, PType::I32, Validity::NonNullable).into_array())
51+
}
52+
53+
async fn contiguous_list_view(len: usize, ctx: &mut CudaExecutionCtx) -> VortexResult<ArrayRef> {
54+
let elements = primitive_i32_on_device((0..len).map(|value| value as i32), ctx).await?;
55+
let offsets = primitive_i32_on_device((0..len).map(|value| value as i32), ctx).await?;
56+
let sizes = primitive_i32_on_device(std::iter::repeat_n(1i32, len), ctx).await?;
57+
58+
Ok(ListViewArray::new(elements, offsets, sizes, Validity::NonNullable).into_array())
59+
}
60+
61+
async fn non_contiguous_primitive_list_view(
62+
len: usize,
63+
ctx: &mut CudaExecutionCtx,
64+
) -> VortexResult<ArrayRef> {
65+
let elements = primitive_i32_on_device((0..len).map(|value| value as i32), ctx).await?;
66+
let offsets = primitive_i32_on_device((0..len).rev().map(|value| value as i32), ctx).await?;
67+
let sizes = primitive_i32_on_device(std::iter::repeat_n(1i32, len), ctx).await?;
68+
69+
Ok(ListViewArray::new(elements, offsets, sizes, Validity::NonNullable).into_array())
70+
}
71+
72+
unsafe fn release_arrow_device_array(array: &mut ArrowDeviceArray) {
73+
unsafe {
74+
if let Some(release) = array.array.release {
75+
release(&raw mut array.array);
76+
}
77+
}
78+
}
79+
80+
fn benchmark_list_view_export(c: &mut Criterion) {
81+
let mut group = c.benchmark_group("cuda");
82+
83+
for &(len, len_label) in LIST_VIEW_CONTIGUOUS_BENCH_SIZES {
84+
// Contiguous path reads offsets/sizes and writes Arrow offsets.
85+
group.throughput(Throughput::Bytes((len * size_of::<i32>() * 3) as u64));
86+
group.bench_with_input(
87+
BenchmarkId::new("cuda/list_view/contiguous_offsets", len_label),
88+
&len,
89+
|b, &len| {
90+
b.iter_custom(|iters| {
91+
let timed = TimedLaunchStrategy::default();
92+
let timer = timed.timer();
93+
94+
let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())
95+
.vortex_expect("failed to create execution context")
96+
.with_launch_strategy(Arc::new(timed));
97+
let array = block_on(contiguous_list_view(len, &mut cuda_ctx))
98+
.vortex_expect("failed to create list-view fixture");
99+
100+
for _ in 0..iters {
101+
let mut exported =
102+
block_on(array.clone().export_device_array(&mut cuda_ctx))
103+
.vortex_expect("failed to export device array");
104+
unsafe { release_arrow_device_array(&mut exported) };
105+
}
106+
107+
Duration::from_nanos(timer.load(Ordering::Relaxed))
108+
});
109+
},
110+
);
111+
}
112+
113+
for &(len, len_label) in LIST_VIEW_REBUILD_BENCH_SIZES {
114+
// Rebuild path scans sizes into Arrow offsets, then gathers primitive child values.
115+
group.throughput(Throughput::Bytes((len * size_of::<i32>() * 4) as u64));
116+
group.bench_with_input(
117+
BenchmarkId::new("cuda/list_view/rebuild_primitive", len_label),
118+
&len,
119+
|b, &len| {
120+
b.iter_custom(|iters| {
121+
let timed = TimedLaunchStrategy::default();
122+
let timer = timed.timer();
123+
124+
let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty())
125+
.vortex_expect("failed to create execution context")
126+
.with_launch_strategy(Arc::new(timed));
127+
let array = block_on(non_contiguous_primitive_list_view(len, &mut cuda_ctx))
128+
.vortex_expect("failed to create list-view fixture");
129+
130+
for _ in 0..iters {
131+
let mut exported =
132+
block_on(array.clone().export_device_array(&mut cuda_ctx))
133+
.vortex_expect("failed to export device array");
134+
unsafe { release_arrow_device_array(&mut exported) };
135+
}
136+
137+
Duration::from_nanos(timer.load(Ordering::Relaxed))
138+
});
139+
},
140+
);
141+
}
142+
143+
group.finish();
144+
}
145+
146+
criterion::criterion_group! {
147+
name = benches;
148+
config = bench_config::cuda_bench_config();
149+
targets = benchmark_list_view_export
150+
}
151+
152+
#[cuda_available]
153+
criterion::criterion_main!(benches);
154+
155+
#[cuda_not_available]
156+
fn main() {}

vortex-cuda/cub/build.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ fn generate_rust_bindings(kernels_dir: &Path, out_dir: &Path) {
101101
.allowlist_function("filter_temp_size_.*")
102102
.allowlist_function("filter_bytemask_.*")
103103
.allowlist_function("filter_bitmask_.*")
104+
.allowlist_function("scan_exclusive_sum_.*")
104105
// Allow CUDA types
105106
.allowlist_type("cudaError_t")
106107
// Blocklist cudaStream_t and define it manually as an opaque pointer

vortex-cuda/cub/kernels/filter.cu

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,3 +188,38 @@ DEFINE_FILTER_BITMASK(f32, float)
188188
DEFINE_FILTER_BITMASK(f64, double)
189189
DEFINE_FILTER_BITMASK(i128, __int128_t)
190190
DEFINE_FILTER_BITMASK(i256, __int256_t)
191+
192+
// CUB scan operations use a two-call API: first call with null temporary storage to query the
193+
// number of scratch bytes required for a given type/item count, then call again with a device
194+
// scratch allocation of that size to enqueue the scan on the target stream. These wrappers expose
195+
// that pattern through a stable C ABI for the Rust side of vortex-cub.
196+
template <typename T>
197+
static cudaError_t scan_exclusive_sum_temp_size_impl(size_t *temp_bytes, int64_t num_items) {
198+
size_t bytes = 0;
199+
cudaError_t err = cub::DeviceScan::ExclusiveSum(nullptr,
200+
bytes,
201+
static_cast<const T *>(nullptr),
202+
static_cast<T *>(nullptr),
203+
num_items);
204+
*temp_bytes = bytes;
205+
return err;
206+
}
207+
208+
// Define one temp-size query and one scan launch function per supported element type. The suffix
209+
// is part of the exported symbol name consumed by bindgen/Rust; keep it in sync with
210+
// vortex-cuda/cub/src/scan.rs.
211+
#define DEFINE_SCAN_EXCLUSIVE_SUM(suffix, Type) \
212+
extern "C" cudaError_t scan_exclusive_sum_##suffix##_temp_size(size_t *temp_bytes, int64_t num_items) { \
213+
return scan_exclusive_sum_temp_size_impl<Type>(temp_bytes, num_items); \
214+
} \
215+
extern "C" cudaError_t scan_exclusive_sum_##suffix(void *d_temp, \
216+
size_t temp_bytes, \
217+
const Type *d_in, \
218+
Type *d_out, \
219+
int64_t num_items, \
220+
cudaStream_t stream) { \
221+
return cub::DeviceScan::ExclusiveSum(d_temp, temp_bytes, d_in, d_out, num_items, stream); \
222+
}
223+
224+
DEFINE_SCAN_EXCLUSIVE_SUM(i32, int32_t)
225+
DEFINE_SCAN_EXCLUSIVE_SUM(i64, int64_t)

vortex-cuda/cub/kernels/filter.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,24 @@ FILTER_TYPE_TABLE(DECLARE_FILTER_BITMASK)
8181

8282
#undef DECLARE_FILTER_BITMASK
8383

84+
cudaError_t scan_exclusive_sum_i32_temp_size(size_t *temp_bytes, int64_t num_items);
85+
86+
cudaError_t scan_exclusive_sum_i32(void *d_temp,
87+
size_t temp_bytes,
88+
const int32_t *d_in,
89+
int32_t *d_out,
90+
int64_t num_items,
91+
cudaStream_t stream);
92+
93+
cudaError_t scan_exclusive_sum_i64_temp_size(size_t *temp_bytes, int64_t num_items);
94+
95+
cudaError_t scan_exclusive_sum_i64(void *d_temp,
96+
size_t temp_bytes,
97+
const int64_t *d_in,
98+
int64_t *d_out,
99+
int64_t num_items,
100+
cudaStream_t stream);
101+
84102
#ifdef __cplusplus
85103
}
86104
#endif

vortex-cuda/cub/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub mod sys;
2323

2424
mod error;
2525
pub mod filter;
26+
pub mod scan;
2627

2728
pub use error::CubError;
2829

vortex-cuda/cub/src/scan.rs

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3+
4+
//! Safe wrappers around CUB DeviceScan operations used by CUDA kernels.
5+
6+
use std::ffi::c_void;
7+
8+
use crate::cub_library;
9+
use crate::error::CubError;
10+
use crate::error::check_cuda_error;
11+
pub use crate::sys::cudaStream_t;
12+
13+
/// Get temporary storage size for CUB `DeviceScan::ExclusiveSum<i32>`.
14+
pub fn exclusive_sum_i32_temp_size(num_items: i64) -> Result<usize, CubError> {
15+
let lib = cub_library()?;
16+
let mut temp_bytes: usize = 0;
17+
let err = unsafe { (lib.scan_exclusive_sum_i32_temp_size)(&raw mut temp_bytes, num_items) };
18+
check_cuda_error(err, "scan_exclusive_sum_i32_temp_size")?;
19+
Ok(temp_bytes)
20+
}
21+
22+
/// Execute CUB `DeviceScan::ExclusiveSum<i32>`.
23+
///
24+
/// # Safety
25+
///
26+
/// All device pointers must be valid and properly sized:
27+
/// - `d_temp` must have at least `temp_bytes` bytes allocated.
28+
/// - `d_in` and `d_out` must have at least `num_items` `i32` values.
29+
pub unsafe fn exclusive_sum_i32(
30+
d_temp: *mut c_void,
31+
temp_bytes: usize,
32+
d_in: *const i32,
33+
d_out: *mut i32,
34+
num_items: i64,
35+
stream: cudaStream_t,
36+
) -> Result<(), CubError> {
37+
let lib = cub_library()?;
38+
let err =
39+
unsafe { (lib.scan_exclusive_sum_i32)(d_temp, temp_bytes, d_in, d_out, num_items, stream) };
40+
check_cuda_error(err, "scan_exclusive_sum_i32")
41+
}
42+
43+
/// Get temporary storage size for CUB `DeviceScan::ExclusiveSum<i64>`.
44+
pub fn exclusive_sum_i64_temp_size(num_items: i64) -> Result<usize, CubError> {
45+
let lib = cub_library()?;
46+
let mut temp_bytes: usize = 0;
47+
let err = unsafe { (lib.scan_exclusive_sum_i64_temp_size)(&raw mut temp_bytes, num_items) };
48+
check_cuda_error(err, "scan_exclusive_sum_i64_temp_size")?;
49+
Ok(temp_bytes)
50+
}
51+
52+
/// Execute CUB `DeviceScan::ExclusiveSum<i64>`.
53+
///
54+
/// # Safety
55+
///
56+
/// All device pointers must be valid and properly sized:
57+
/// - `d_temp` must have at least `temp_bytes` bytes allocated.
58+
/// - `d_in` and `d_out` must have at least `num_items` `i64` values.
59+
pub unsafe fn exclusive_sum_i64(
60+
d_temp: *mut c_void,
61+
temp_bytes: usize,
62+
d_in: *const i64,
63+
d_out: *mut i64,
64+
num_items: i64,
65+
stream: cudaStream_t,
66+
) -> Result<(), CubError> {
67+
let lib = cub_library()?;
68+
let err =
69+
unsafe { (lib.scan_exclusive_sum_i64)(d_temp, temp_bytes, d_in, d_out, num_items, stream) };
70+
check_cuda_error(err, "scan_exclusive_sum_i64")
71+
}

0 commit comments

Comments
 (0)