Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions vortex-btrblocks/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl BtrBlocksCompressorBuilder {
self
}

/// Adds compact encoding schemes (Zstd for strings, Pco for numerics).
/// Adds compact encoding schemes (Zstd for strings and binary, Pco for numerics).
///
/// This provides better compression ratios than the default, especially for floating-point
/// heavy datasets. Requires the `zstd` feature. When the `pco` feature is also enabled,
Expand All @@ -146,7 +146,9 @@ impl BtrBlocksCompressorBuilder {
/// Panics if any of the compact schemes are already present.
#[cfg(feature = "zstd")]
pub fn with_compact(self) -> Self {
let builder = self.with_new_scheme(&string::ZstdScheme);
let builder = self
.with_new_scheme(&string::ZstdScheme)
.with_new_scheme(&binary::ZstdScheme);

#[cfg(feature = "pco")]
let builder = builder
Expand All @@ -172,7 +174,7 @@ impl BtrBlocksCompressorBuilder {
self.with_new_scheme(&TurboQuantScheme)
}

/// Excludes schemes without CUDA kernel support and adds Zstd for string compression.
/// Excludes schemes without CUDA kernel support and adds Zstd for string and binary compression.
///
/// With the `unstable_encodings` feature, buffer-level Zstd compression is used which
/// preserves the array buffer layout for zero-conversion GPU decompression. Without it,
Expand All @@ -197,9 +199,13 @@ impl BtrBlocksCompressorBuilder {
let builder = self.exclude_schemes(excluded);

#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
let builder = builder.with_new_scheme(&string::ZstdBuffersScheme);
let builder = builder
.with_new_scheme(&string::ZstdBuffersScheme)
.with_new_scheme(&binary::ZstdBuffersScheme);
#[cfg(all(feature = "zstd", not(feature = "unstable_encodings")))]
let builder = builder.with_new_scheme(&string::ZstdScheme);
let builder = builder
.with_new_scheme(&string::ZstdScheme)
.with_new_scheme(&binary::ZstdScheme);

builder
}
Expand Down
66 changes: 66 additions & 0 deletions vortex-btrblocks/src/canonical_compressor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ mod tests {
use vortex_session::VortexSession;

use crate::BtrBlocksCompressor;
#[cfg(feature = "zstd")]
use crate::BtrBlocksCompressorBuilder;

static SESSION: LazyLock<VortexSession> =
LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
Expand Down Expand Up @@ -226,4 +228,68 @@ mod tests {
assert_arrays_eq!(compressed, array);
Ok(())
}

#[cfg(feature = "zstd")]
#[test]
fn test_compact_binary_zstd_compressed() -> VortexResult<()> {
let values = (0..1024)
.map(|idx| {
let mut value = Vec::from(&b"common binary payload prefix "[..]);
value.extend_from_slice(&(idx as u32).to_le_bytes());
value.extend_from_slice(&[b'x'; 96]);
value
})
.collect::<Vec<_>>();
let array = VarBinViewArray::from_iter(
values.iter().map(|value| Some(value.as_slice())),
DType::Binary(Nullability::NonNullable),
);

let compressor = BtrBlocksCompressorBuilder::default().with_compact().build();
let compressed = compressor.compress(
&array.clone().into_array(),
&mut SESSION.create_execution_ctx(),
)?;

assert!(
compressed.is::<vortex_zstd::Zstd>(),
"expected Zstd, got {}",
compressed.encoding_id()
);
assert_arrays_eq!(compressed, array);
Ok(())
}

#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
#[test]
fn test_cuda_compatible_binary_zstd_buffers_compressed() -> VortexResult<()> {
let values = (0..1024)
.map(|idx| {
let mut value = Vec::from(&b"common binary payload prefix "[..]);
value.extend_from_slice(&(idx as u32).to_le_bytes());
value.extend_from_slice(&[b'x'; 96]);
value
})
.collect::<Vec<_>>();
let array = VarBinViewArray::from_iter(
values.iter().map(|value| Some(value.as_slice())),
DType::Binary(Nullability::NonNullable),
);

let compressor = BtrBlocksCompressorBuilder::default()
.only_cuda_compatible()
.build();
let compressed = compressor.compress(
&array.clone().into_array(),
&mut SESSION.create_execution_ctx(),
)?;

assert!(
compressed.is::<vortex_zstd::ZstdBuffers>(),
"expected ZstdBuffers, got {}",
compressed.encoding_id()
);
assert_arrays_eq!(compressed, array);
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@

//! Binary compression schemes.

#[cfg(feature = "zstd")]
mod zstd;
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
mod zstd_buffers;

// Re-export builtin schemes from vortex-compressor.
pub use vortex_compressor::builtins::BinaryConstantScheme;
pub use vortex_compressor::builtins::BinaryDictScheme;
#[cfg(feature = "zstd")]
pub use zstd::ZstdScheme;
#[cfg(all(feature = "zstd", feature = "unstable_encodings"))]
pub use zstd_buffers::ZstdBuffersScheme;
54 changes: 54 additions & 0 deletions vortex-btrblocks/src/schemes/binary/zstd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Zstd compression for binary arrays.

use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::DeferredEstimate;
use vortex_error::VortexResult;

use crate::ArrayAndStats;
use crate::CascadingCompressor;
use crate::CompressorContext;
use crate::Scheme;

/// Zstd compression without dictionaries for binary arrays.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct ZstdScheme;

impl Scheme for ZstdScheme {
fn scheme_name(&self) -> &'static str {
"vortex.binary.zstd"
}

fn matches(&self, canonical: &Canonical) -> bool {
canonical.dtype().is_binary()
}

fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
&self,
_compressor: &CascadingCompressor,
data: &ArrayAndStats,
_compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
let compacted = data.array_as_varbinview().into_owned().compact_buffers()?;
Ok(
vortex_zstd::Zstd::from_var_bin_view_without_dict(&compacted, 3, 8192, exec_ctx)?
.into_array(),
)
}
}
50 changes: 50 additions & 0 deletions vortex-btrblocks/src/schemes/binary/zstd_buffers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

//! Zstd buffer-level binary compression preserving array layout for GPU decompression.

use vortex_array::ArrayRef;
use vortex_array::Canonical;
use vortex_array::ExecutionCtx;
use vortex_array::IntoArray;
use vortex_compressor::estimate::CompressionEstimate;
use vortex_compressor::estimate::DeferredEstimate;
use vortex_error::VortexResult;

use crate::ArrayAndStats;
use crate::CascadingCompressor;
use crate::CompressorContext;
use crate::Scheme;

/// Zstd buffer-level compression preserving array layout for GPU decompression.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub struct ZstdBuffersScheme;

impl Scheme for ZstdBuffersScheme {
fn scheme_name(&self) -> &'static str {
"vortex.binary.zstd_buffers"
}

fn matches(&self, canonical: &Canonical) -> bool {
canonical.dtype().is_binary()
}

fn expected_compression_ratio(
&self,
_data: &ArrayAndStats,
_compress_ctx: CompressorContext,
_exec_ctx: &mut ExecutionCtx,
) -> CompressionEstimate {
CompressionEstimate::Deferred(DeferredEstimate::Sample)
}

fn compress(
&self,
_compressor: &CascadingCompressor,
data: &ArrayAndStats,
_compress_ctx: CompressorContext,
exec_ctx: &mut ExecutionCtx,
) -> VortexResult<ArrayRef> {
Ok(vortex_zstd::ZstdBuffers::compress(data.array(), 3, exec_ctx.session())?.into_array())
}
}
Loading