Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Host decompression #18114

Open
wants to merge 17 commits into
base: branch-25.04
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ add_library(
src/io/comp/brotli_dict.cpp
src/io/comp/comp.cpp
src/io/comp/comp.cu
src/io/comp/common.cpp
src/io/comp/cpu_unbz2.cpp
src/io/comp/debrotli.cu
src/io/comp/gpuinflate.cu
Expand Down
57 changes: 57 additions & 0 deletions cpp/src/io/comp/common.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "common_internal.hpp"
#include "nvcomp_adapter.hpp"

#include <cudf/io/types.hpp>

namespace cudf::io::detail {

[[nodiscard]] std::optional<nvcomp::compression_type> to_nvcomp_compression(
compression_type compression)
{
switch (compression) {
case compression_type::GZIP: return nvcomp::compression_type::GZIP;
case compression_type::LZ4: return nvcomp::compression_type::LZ4;
case compression_type::SNAPPY: return nvcomp::compression_type::SNAPPY;
case compression_type::ZLIB: return nvcomp::compression_type::DEFLATE;
case compression_type::ZSTD: return nvcomp::compression_type::ZSTD;
default: return std::nullopt;
}
}

[[nodiscard]] std::string compression_type_name(compression_type compression)
{
switch (compression) {
case compression_type::NONE: return "NONE";
case compression_type::AUTO: return "AUTO";
case compression_type::SNAPPY: return "Snappy";
case compression_type::GZIP: return "GZIP";
case compression_type::BZIP2: return "BZIP2";
case compression_type::BROTLI: return "Brotlu";
case compression_type::ZIP: return "ZIP";
case compression_type::XZ: return "XZ";
case compression_type::ZLIB: return "ZLIB";
case compression_type::LZ4: return "LZ4";
case compression_type::LZO: return "LZO";
case compression_type::ZSTD: return "ZStandard";
default:
CUDF_FAIL("Invalid compression type: " + std::to_string(static_cast<int>(compression)));
}
}

} // namespace cudf::io::detail
21 changes: 20 additions & 1 deletion cpp/src/io/comp/common.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
* Copyright (c) 2024-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
#pragma once

#include <cstddef>
#include <cstdint>

namespace cudf::io::detail {

Expand All @@ -34,4 +35,22 @@ namespace cudf::io::detail {
*/
constexpr std::size_t BUFFER_PADDING_MULTIPLE{8};

/**
* @brief Status of a compression/decompression operation.
*/
enum class compression_status : uint8_t {
SUCCESS, ///< Successful, output is valid
FAILURE, ///< Failed, output is invalid (e.g. input is unsupported in some way)
SKIPPED, ///< Operation skipped (if conversion, uncompressed data can be used)
OUTPUT_OVERFLOW, ///< Output buffer is too small; operation can succeed with larger output
};

/**
* @brief Descriptor of compression/decompression result.
*/
struct compression_result {
uint64_t bytes_written;
compression_status status;
};

} // namespace cudf::io::detail
44 changes: 44 additions & 0 deletions cpp/src/io/comp/common_internal.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "nvcomp_adapter.hpp"

#include <cudf/io/types.hpp>

#include <optional>

namespace cudf::io::detail {

/**
* @brief GZIP header flags
* See https://tools.ietf.org/html/rfc1952
*/
namespace GZIPHeaderFlag {
constexpr uint8_t ftext = 0x01; // ASCII text hint
constexpr uint8_t fhcrc = 0x02; // Header CRC present
constexpr uint8_t fextra = 0x04; // Extra fields present
constexpr uint8_t fname = 0x08; // Original file name present
constexpr uint8_t fcomment = 0x10; // Comment present
}; // namespace GZIPHeaderFlag

[[nodiscard]] std::optional<nvcomp::compression_type> to_nvcomp_compression(
compression_type compression);

[[nodiscard]] std::string compression_type_name(compression_type compression);

} // namespace cudf::io::detail
36 changes: 9 additions & 27 deletions cpp/src/io/comp/comp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

#include "comp.hpp"

#include "common_internal.hpp"
#include "gpuinflate.hpp"
#include "io/utilities/getenv_or.hpp"
#include "nvcomp_adapter.hpp"

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda_memcpy.hpp>
#include <cudf/detail/utilities/host_worker_pool.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>
#include <cudf/detail/utilities/vector_factories.hpp>
#include <cudf/utilities/error.hpp>
Expand All @@ -37,26 +39,6 @@ namespace cudf::io::detail {

namespace {

auto& h_comp_pool()
{
static const std::size_t default_pool_size = std::min(32u, std::thread::hardware_concurrency());
static const std::size_t pool_size =
getenv_or("LIBCUDF_HOST_COMPRESSION_NUM_THREADS", default_pool_size);
static BS::thread_pool pool(pool_size);
return pool;
}

std::optional<nvcomp::compression_type> to_nvcomp_compression(compression_type compression)
{
switch (compression) {
case compression_type::SNAPPY: return nvcomp::compression_type::SNAPPY;
case compression_type::ZSTD: return nvcomp::compression_type::ZSTD;
case compression_type::LZ4: return nvcomp::compression_type::LZ4;
case compression_type::ZLIB: return nvcomp::compression_type::DEFLATE;
default: return std::nullopt;
}
}

/**
* @brief GZIP host compressor (includes header)
*/
Expand Down Expand Up @@ -333,7 +315,7 @@ void host_compress(compression_type compression,
auto const num_streams =
std::min<std::size_t>({num_chunks,
cudf::detail::global_cuda_stream_pool().get_stream_pool_size(),
h_comp_pool().get_thread_count()});
cudf::detail::host_worker_pool().get_thread_count()});
auto const streams = cudf::detail::fork_streams(stream, num_streams);
for (size_t i = 0; i < num_chunks; ++i) {
auto const idx = task_order[i];
Expand All @@ -346,7 +328,7 @@ void host_compress(compression_type compression,
cudf::detail::cuda_memcpy<uint8_t>(d_out.subspan(0, h_out.size()), h_out, cur_stream);
return h_out.size();
};
tasks.emplace_back(h_comp_pool().submit_task(std::move(task)));
tasks.emplace_back(cudf::detail::host_worker_pool().submit_task(std::move(task)));
}

for (auto i = 0ul; i < num_chunks; ++i) {
Expand All @@ -369,6 +351,7 @@ void host_compress(compression_type compression,
{
auto const nvcomp_type = to_nvcomp_compression(compression);
switch (compression) {
case compression_type::GZIP:
case compression_type::LZ4:
case compression_type::ZLIB:
case compression_type::ZSTD: return not nvcomp::is_compression_disabled(nvcomp_type.value());
Expand All @@ -383,9 +366,8 @@ void host_compress(compression_type compression,
[[maybe_unused]] device_span<device_span<uint8_t const> const> inputs,
[[maybe_unused]] device_span<device_span<uint8_t> const> outputs)
{
CUDF_EXPECTS(
not host_compression_supported(compression) or device_compression_supported(compression),
"Unsupported compression type");
CUDF_EXPECTS(host_compression_supported(compression) or device_compression_supported(compression),
"Unsupported compression type: " + compression_type_name(compression));
if (not host_compression_supported(compression)) { return false; }
if (not device_compression_supported(compression)) { return true; }
// If both host and device compression are supported, use the host if the env var is set
Expand Down Expand Up @@ -421,7 +403,7 @@ std::optional<size_t> compress_max_allowed_chunk_size(compression_type compressi
if (auto nvcomp_type = to_nvcomp_compression(compression); nvcomp_type.has_value()) {
return nvcomp::compress_max_output_chunk_size(*nvcomp_type, uncompressed_size);
}
CUDF_FAIL("Unsupported compression type");
CUDF_FAIL("Unsupported compression type: " + compression_type_name(compression));
}

std::vector<std::uint8_t> compress(compression_type compression,
Expand All @@ -432,7 +414,7 @@ std::vector<std::uint8_t> compress(compression_type compression,
switch (compression) {
case compression_type::GZIP: return compress_gzip(src);
case compression_type::SNAPPY: return snappy::compress(src);
default: CUDF_FAIL("Unsupported compression type");
default: CUDF_FAIL("Unsupported compression type: " + compression_type_name(compression));
}
}

Expand Down
18 changes: 0 additions & 18 deletions cpp/src/io/comp/comp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,6 @@
namespace CUDF_EXPORT cudf {
namespace io::detail {

/**
* @brief Status of a compression/decompression operation.
*/
enum class compression_status : uint8_t {
SUCCESS, ///< Successful, output is valid
FAILURE, ///< Failed, output is invalid (e.g. input is unsupported in some way)
SKIPPED, ///< Operation skipped (if conversion, uncompressed data can be used)
OUTPUT_OVERFLOW, ///< Output buffer is too small; operation can succeed with larger output
};

/**
* @brief Descriptor of compression/decompression result.
*/
struct compression_result {
uint64_t bytes_written;
compression_status status;
};

/**
* @brief Compresses a system memory buffer.
*
Expand Down
34 changes: 18 additions & 16 deletions cpp/src/io/comp/debrotli.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -2045,7 +2045,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 2)
*
* @return The size in bytes of required temporary memory
*/
size_t __host__ get_gpu_debrotli_scratch_size(int max_num_inputs)
size_t get_gpu_debrotli_scratch_size(int max_num_inputs)
{
uint32_t max_fb_size, min_fb_size, fb_size;
auto const sm_count = cudf::detail::num_multiprocessors();
Expand All @@ -2062,9 +2062,14 @@ size_t __host__ get_gpu_debrotli_scratch_size(int max_num_inputs)
min_fb_size = 10 * 1024; // TODO: Gather some statistics for typical meta-block size
// Allocate at least two worst-case metablocks or 1 metablock plus typical size for every other
// block
fb_size = max(max_fb_size * min(max_num_inputs, 2), max_fb_size + max_num_inputs * min_fb_size);
fb_size =
std::max(max_fb_size * std::min(max_num_inputs, 2), max_fb_size + max_num_inputs * min_fb_size);
// Add some room for alignment
return fb_size + 16 + sizeof(brotli_dictionary_s);
auto const aligned_size = fb_size + 16 + sizeof(brotli_dictionary_s);
auto const clamped_size = std::min(aligned_size, static_cast<size_t>(0xffff'ffffu));
CUDF_EXPECTS(clamped_size >= sizeof(brotli_dictionary_s),
"Insufficient scratch space for debrotli");
return clamped_size;
}

#define DUMP_FB_HEAP 0
Expand All @@ -2075,38 +2080,35 @@ size_t __host__ get_gpu_debrotli_scratch_size(int max_num_inputs)
void gpu_debrotli(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<compression_result> results,
void* scratch,
size_t scratch_size,
rmm::cuda_stream_view stream)
{
// Scratch memory for decompressing
rmm::device_uvector<uint8_t> scratch(
cudf::io::detail::get_gpu_debrotli_scratch_size(inputs.size()), stream);

auto const count = inputs.size();
uint32_t fb_heap_size;
auto* scratch_u8 = static_cast<uint8_t*>(scratch);
dim3 dim_block(block_size, 1);
dim3 dim_grid(count, 1); // TODO: Check max grid dimensions vs max expected count

CUDF_EXPECTS(scratch_size >= sizeof(brotli_dictionary_s),
"Insufficient scratch space for debrotli");
scratch_size = min(scratch_size, static_cast<size_t>(0xffff'ffffu));
fb_heap_size = (uint32_t)((scratch_size - sizeof(brotli_dictionary_s)) & ~0xf);
auto const fb_heap_size = (uint32_t)((scratch.size() - sizeof(brotli_dictionary_s)) & ~0xf);

CUDF_CUDA_TRY(cudaMemsetAsync(scratch_u8, 0, 2 * sizeof(uint32_t), stream.value()));
CUDF_CUDA_TRY(cudaMemsetAsync(scratch.data(), 0, 2 * sizeof(uint32_t), stream.value()));
// NOTE: The 128KB dictionary copy can have a relatively large overhead since source isn't
// page-locked
CUDF_CUDA_TRY(cudaMemcpyAsync(scratch_u8 + fb_heap_size,
CUDF_CUDA_TRY(cudaMemcpyAsync(scratch.data() + fb_heap_size,
get_brotli_dictionary(),
sizeof(brotli_dictionary_s),
cudaMemcpyDefault,
stream.value()));
gpu_debrotli_kernel<<<dim_grid, dim_block, 0, stream.value()>>>(
inputs, outputs, results, scratch_u8, fb_heap_size);
inputs, outputs, results, scratch.data(), fb_heap_size);
#if DUMP_FB_HEAP
uint32_t dump[2];
uint32_t cur = 0;
printf("heap dump (%d bytes)\n", fb_heap_size);
while (cur < fb_heap_size && !(cur & 3)) {
CUDF_CUDA_TRY(cudaMemcpyAsync(
&dump[0], scratch_u8 + cur, 2 * sizeof(uint32_t), cudaMemcpyDefault, stream.value()));
&dump[0], scratch.data() + cur, 2 * sizeof(uint32_t), cudaMemcpyDefault, stream.value()));
stream.synchronize();
printf("@%d: next = %d, size = %d\n", cur, dump[0], dump[1]);
cur = (dump[0] > cur) ? dump[0] : 0xffff'ffffu;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/comp/gpuinflate.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2024, NVIDIA CORPORATION.
* Copyright (c) 2018-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -43,9 +43,9 @@ misrepresented as being the original software.
Mark Adler [email protected]
*/

#include "common_internal.hpp"
#include "gpuinflate.hpp"
#include "io/utilities/block_utils.cuh"
#include "io_uncomp.hpp"

#include <rmm/cuda_stream_view.hpp>

Expand Down
4 changes: 1 addition & 3 deletions cpp/src/io/comp/gpuinflate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#pragma once

#include "io/comp/comp.hpp"
#include "common.hpp"

#include <cudf/io/types.hpp>
#include <cudf/utilities/export.hpp>
Expand Down Expand Up @@ -104,8 +104,6 @@ CUDF_EXPORT
void gpu_debrotli(device_span<device_span<uint8_t const> const> inputs,
device_span<device_span<uint8_t> const> outputs,
device_span<compression_result> results,
void* scratch,
size_t scratch_size,
rmm::cuda_stream_view stream);

/**
Expand Down
Loading