diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 0282282b5f3..68bf3dd54ae 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -475,6 +475,7 @@ add_library( src/io/comp/brotli_dict.cpp src/io/comp/comp.cpp src/io/comp/comp.cu + src/io/comp/common.cpp src/io/comp/cpu_unbz2.cpp src/io/comp/debrotli.cu src/io/comp/gpuinflate.cu diff --git a/cpp/src/io/comp/common.cpp b/cpp/src/io/comp/common.cpp new file mode 100644 index 00000000000..7077c14d590 --- /dev/null +++ b/cpp/src/io/comp/common.cpp @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "common_internal.hpp" +#include "nvcomp_adapter.hpp" + +#include + +namespace cudf::io::detail { + +[[nodiscard]] std::optional to_nvcomp_compression( + compression_type compression) +{ + switch (compression) { + case compression_type::GZIP: return nvcomp::compression_type::GZIP; + case compression_type::LZ4: return nvcomp::compression_type::LZ4; + case compression_type::SNAPPY: return nvcomp::compression_type::SNAPPY; + case compression_type::ZLIB: return nvcomp::compression_type::DEFLATE; + case compression_type::ZSTD: return nvcomp::compression_type::ZSTD; + default: return std::nullopt; + } +} + +[[nodiscard]] std::string compression_type_name(compression_type compression) +{ + switch (compression) { + case compression_type::NONE: return "NONE"; + case compression_type::AUTO: return "AUTO"; + case compression_type::SNAPPY: return "Snappy"; + case compression_type::GZIP: return "GZIP"; + case compression_type::BZIP2: return "BZIP2"; + case compression_type::BROTLI: return "Brotlu"; + case compression_type::ZIP: return "ZIP"; + case compression_type::XZ: return "XZ"; + case compression_type::ZLIB: return "ZLIB"; + case compression_type::LZ4: return "LZ4"; + case compression_type::LZO: return "LZO"; + case compression_type::ZSTD: return "ZStandard"; + default: + CUDF_FAIL("Invalid compression type: " + std::to_string(static_cast(compression))); + } +} + +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/common.hpp b/cpp/src/io/comp/common.hpp index a81ac60e03a..aa9507243e8 100644 --- a/cpp/src/io/comp/common.hpp +++ b/cpp/src/io/comp/common.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024, NVIDIA CORPORATION. + * Copyright (c) 2024-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ #pragma once #include +#include namespace cudf::io::detail { @@ -34,4 +35,22 @@ namespace cudf::io::detail { */ constexpr std::size_t BUFFER_PADDING_MULTIPLE{8}; +/** + * @brief Status of a compression/decompression operation. + */ +enum class compression_status : uint8_t { + SUCCESS, ///< Successful, output is valid + FAILURE, ///< Failed, output is invalid (e.g. input is unsupported in some way) + SKIPPED, ///< Operation skipped (if conversion, uncompressed data can be used) + OUTPUT_OVERFLOW, ///< Output buffer is too small; operation can succeed with larger output +}; + +/** + * @brief Descriptor of compression/decompression result. + */ +struct compression_result { + uint64_t bytes_written; + compression_status status; +}; + } // namespace cudf::io::detail diff --git a/cpp/src/io/comp/common_internal.hpp b/cpp/src/io/comp/common_internal.hpp new file mode 100644 index 00000000000..eb34698d178 --- /dev/null +++ b/cpp/src/io/comp/common_internal.hpp @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "nvcomp_adapter.hpp" + +#include + +#include + +namespace cudf::io::detail { + +/** + * @brief GZIP header flags + * See https://tools.ietf.org/html/rfc1952 + */ +namespace GZIPHeaderFlag { +constexpr uint8_t ftext = 0x01; // ASCII text hint +constexpr uint8_t fhcrc = 0x02; // Header CRC present +constexpr uint8_t fextra = 0x04; // Extra fields present +constexpr uint8_t fname = 0x08; // Original file name present +constexpr uint8_t fcomment = 0x10; // Comment present +}; // namespace GZIPHeaderFlag + +[[nodiscard]] std::optional to_nvcomp_compression( + compression_type compression); + +[[nodiscard]] std::string compression_type_name(compression_type compression); + +} // namespace cudf::io::detail diff --git a/cpp/src/io/comp/comp.cpp b/cpp/src/io/comp/comp.cpp index 280c07a4ff1..d990082d35c 100644 --- a/cpp/src/io/comp/comp.cpp +++ b/cpp/src/io/comp/comp.cpp @@ -16,12 +16,14 @@ #include "comp.hpp" +#include "common_internal.hpp" #include "gpuinflate.hpp" #include "io/utilities/getenv_or.hpp" #include "nvcomp_adapter.hpp" #include #include +#include #include #include #include @@ -37,26 +39,6 @@ namespace cudf::io::detail { namespace { -auto& h_comp_pool() -{ - static const std::size_t default_pool_size = std::min(32u, std::thread::hardware_concurrency()); - static const std::size_t pool_size = - getenv_or("LIBCUDF_HOST_COMPRESSION_NUM_THREADS", default_pool_size); - static BS::thread_pool pool(pool_size); - return pool; -} - -std::optional to_nvcomp_compression(compression_type compression) -{ - switch (compression) { - case compression_type::SNAPPY: return nvcomp::compression_type::SNAPPY; - case compression_type::ZSTD: return nvcomp::compression_type::ZSTD; - case compression_type::LZ4: return nvcomp::compression_type::LZ4; - case compression_type::ZLIB: return nvcomp::compression_type::DEFLATE; - default: return std::nullopt; - } -} - /** * @brief GZIP host compressor (includes header) */ @@ -333,7 +315,7 @@ void host_compress(compression_type compression, auto const num_streams = std::min({num_chunks, cudf::detail::global_cuda_stream_pool().get_stream_pool_size(), - h_comp_pool().get_thread_count()}); + cudf::detail::host_worker_pool().get_thread_count()}); auto const streams = cudf::detail::fork_streams(stream, num_streams); for (size_t i = 0; i < num_chunks; ++i) { auto const idx = task_order[i]; @@ -346,7 +328,7 @@ void host_compress(compression_type compression, cudf::detail::cuda_memcpy(d_out.subspan(0, h_out.size()), h_out, cur_stream); return h_out.size(); }; - tasks.emplace_back(h_comp_pool().submit_task(std::move(task))); + tasks.emplace_back(cudf::detail::host_worker_pool().submit_task(std::move(task))); } for (auto i = 0ul; i < num_chunks; ++i) { @@ -369,6 +351,7 @@ void host_compress(compression_type compression, { auto const nvcomp_type = to_nvcomp_compression(compression); switch (compression) { + case compression_type::GZIP: case compression_type::LZ4: case compression_type::ZLIB: case compression_type::ZSTD: return not nvcomp::is_compression_disabled(nvcomp_type.value()); @@ -383,9 +366,8 @@ void host_compress(compression_type compression, [[maybe_unused]] device_span const> inputs, [[maybe_unused]] device_span const> outputs) { - CUDF_EXPECTS( - not host_compression_supported(compression) or device_compression_supported(compression), - "Unsupported compression type"); + CUDF_EXPECTS(host_compression_supported(compression) or device_compression_supported(compression), + "Unsupported compression type: " + compression_type_name(compression)); if (not host_compression_supported(compression)) { return false; } if (not device_compression_supported(compression)) { return true; } // If both host and device compression are supported, use the host if the env var is set @@ -421,7 +403,7 @@ std::optional compress_max_allowed_chunk_size(compression_type compressi if (auto nvcomp_type = to_nvcomp_compression(compression); nvcomp_type.has_value()) { return nvcomp::compress_max_output_chunk_size(*nvcomp_type, uncompressed_size); } - CUDF_FAIL("Unsupported compression type"); + CUDF_FAIL("Unsupported compression type: " + compression_type_name(compression)); } std::vector compress(compression_type compression, @@ -432,7 +414,7 @@ std::vector compress(compression_type compression, switch (compression) { case compression_type::GZIP: return compress_gzip(src); case compression_type::SNAPPY: return snappy::compress(src); - default: CUDF_FAIL("Unsupported compression type"); + default: CUDF_FAIL("Unsupported compression type: " + compression_type_name(compression)); } } diff --git a/cpp/src/io/comp/comp.hpp b/cpp/src/io/comp/comp.hpp index 90932a11499..c2535c1e4e3 100644 --- a/cpp/src/io/comp/comp.hpp +++ b/cpp/src/io/comp/comp.hpp @@ -26,24 +26,6 @@ namespace CUDF_EXPORT cudf { namespace io::detail { -/** - * @brief Status of a compression/decompression operation. - */ -enum class compression_status : uint8_t { - SUCCESS, ///< Successful, output is valid - FAILURE, ///< Failed, output is invalid (e.g. input is unsupported in some way) - SKIPPED, ///< Operation skipped (if conversion, uncompressed data can be used) - OUTPUT_OVERFLOW, ///< Output buffer is too small; operation can succeed with larger output -}; - -/** - * @brief Descriptor of compression/decompression result. - */ -struct compression_result { - uint64_t bytes_written; - compression_status status; -}; - /** * @brief Compresses a system memory buffer. * diff --git a/cpp/src/io/comp/debrotli.cu b/cpp/src/io/comp/debrotli.cu index 151f72d262e..c044eb6583d 100644 --- a/cpp/src/io/comp/debrotli.cu +++ b/cpp/src/io/comp/debrotli.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * Copyright (c) 2018-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -2045,7 +2045,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 2) * * @return The size in bytes of required temporary memory */ -size_t __host__ get_gpu_debrotli_scratch_size(int max_num_inputs) +size_t get_gpu_debrotli_scratch_size(int max_num_inputs) { uint32_t max_fb_size, min_fb_size, fb_size; auto const sm_count = cudf::detail::num_multiprocessors(); @@ -2062,9 +2062,14 @@ size_t __host__ get_gpu_debrotli_scratch_size(int max_num_inputs) min_fb_size = 10 * 1024; // TODO: Gather some statistics for typical meta-block size // Allocate at least two worst-case metablocks or 1 metablock plus typical size for every other // block - fb_size = max(max_fb_size * min(max_num_inputs, 2), max_fb_size + max_num_inputs * min_fb_size); + fb_size = + std::max(max_fb_size * std::min(max_num_inputs, 2), max_fb_size + max_num_inputs * min_fb_size); // Add some room for alignment - return fb_size + 16 + sizeof(brotli_dictionary_s); + auto const aligned_size = fb_size + 16 + sizeof(brotli_dictionary_s); + auto const clamped_size = std::min(aligned_size, static_cast(0xffff'ffffu)); + CUDF_EXPECTS(clamped_size >= sizeof(brotli_dictionary_s), + "Insufficient scratch space for debrotli"); + return clamped_size; } #define DUMP_FB_HEAP 0 @@ -2075,38 +2080,35 @@ size_t __host__ get_gpu_debrotli_scratch_size(int max_num_inputs) void gpu_debrotli(device_span const> inputs, device_span const> outputs, device_span results, - void* scratch, - size_t scratch_size, rmm::cuda_stream_view stream) { + // Scratch memory for decompressing + rmm::device_uvector scratch( + cudf::io::detail::get_gpu_debrotli_scratch_size(inputs.size()), stream); + auto const count = inputs.size(); - uint32_t fb_heap_size; - auto* scratch_u8 = static_cast(scratch); dim3 dim_block(block_size, 1); dim3 dim_grid(count, 1); // TODO: Check max grid dimensions vs max expected count - CUDF_EXPECTS(scratch_size >= sizeof(brotli_dictionary_s), - "Insufficient scratch space for debrotli"); - scratch_size = min(scratch_size, static_cast(0xffff'ffffu)); - fb_heap_size = (uint32_t)((scratch_size - sizeof(brotli_dictionary_s)) & ~0xf); + auto const fb_heap_size = (uint32_t)((scratch.size() - sizeof(brotli_dictionary_s)) & ~0xf); - CUDF_CUDA_TRY(cudaMemsetAsync(scratch_u8, 0, 2 * sizeof(uint32_t), stream.value())); + CUDF_CUDA_TRY(cudaMemsetAsync(scratch.data(), 0, 2 * sizeof(uint32_t), stream.value())); // NOTE: The 128KB dictionary copy can have a relatively large overhead since source isn't // page-locked - CUDF_CUDA_TRY(cudaMemcpyAsync(scratch_u8 + fb_heap_size, + CUDF_CUDA_TRY(cudaMemcpyAsync(scratch.data() + fb_heap_size, get_brotli_dictionary(), sizeof(brotli_dictionary_s), cudaMemcpyDefault, stream.value())); gpu_debrotli_kernel<<>>( - inputs, outputs, results, scratch_u8, fb_heap_size); + inputs, outputs, results, scratch.data(), fb_heap_size); #if DUMP_FB_HEAP uint32_t dump[2]; uint32_t cur = 0; printf("heap dump (%d bytes)\n", fb_heap_size); while (cur < fb_heap_size && !(cur & 3)) { CUDF_CUDA_TRY(cudaMemcpyAsync( - &dump[0], scratch_u8 + cur, 2 * sizeof(uint32_t), cudaMemcpyDefault, stream.value())); + &dump[0], scratch.data() + cur, 2 * sizeof(uint32_t), cudaMemcpyDefault, stream.value())); stream.synchronize(); printf("@%d: next = %d, size = %d\n", cur, dump[0], dump[1]); cur = (dump[0] > cur) ? dump[0] : 0xffff'ffffu; diff --git a/cpp/src/io/comp/gpuinflate.cu b/cpp/src/io/comp/gpuinflate.cu index 6e5ce4ce6c3..1927e88b58d 100644 --- a/cpp/src/io/comp/gpuinflate.cu +++ b/cpp/src/io/comp/gpuinflate.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * Copyright (c) 2018-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -43,9 +43,9 @@ misrepresented as being the original software. Mark Adler madler@alumni.caltech.edu */ +#include "common_internal.hpp" #include "gpuinflate.hpp" #include "io/utilities/block_utils.cuh" -#include "io_uncomp.hpp" #include diff --git a/cpp/src/io/comp/gpuinflate.hpp b/cpp/src/io/comp/gpuinflate.hpp index 0a35b230242..d8123f6a24a 100644 --- a/cpp/src/io/comp/gpuinflate.hpp +++ b/cpp/src/io/comp/gpuinflate.hpp @@ -16,7 +16,7 @@ #pragma once -#include "io/comp/comp.hpp" +#include "common.hpp" #include #include @@ -104,8 +104,6 @@ CUDF_EXPORT void gpu_debrotli(device_span const> inputs, device_span const> outputs, device_span results, - void* scratch, - size_t scratch_size, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/comp/io_uncomp.hpp b/cpp/src/io/comp/io_uncomp.hpp index 711a1c3274f..69ad944d383 100644 --- a/cpp/src/io/comp/io_uncomp.hpp +++ b/cpp/src/io/comp/io_uncomp.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * Copyright (c) 2018-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,6 +52,14 @@ size_t decompress(compression_type compression, host_span dst, rmm::cuda_stream_view stream); +void decompress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + size_t max_uncomp_chunk_size, + size_t max_total_uncomp_size, + rmm::cuda_stream_view stream); + /** * @brief Without actually decompressing the compressed input buffer passed, return the size of * decompressed output. If the decompressed size cannot be extracted apriori, return zero. @@ -64,16 +72,25 @@ size_t decompress(compression_type compression, size_t get_uncompressed_size(compression_type compression, host_span src); /** - * @brief GZIP header flags - * See https://tools.ietf.org/html/rfc1952 + * @brief Struct to hold information about decompression. + * + * This struct contains details about the decompression process, including + * the type of compression, the number of pages, the maximum size + * of a decompressed page, and the total decompressed size. + */ +struct decompression_info { + compression_type type; + size_t num_pages; + size_t max_page_decompressed_size; + size_t total_decompressed_size; +}; + +/** + * @brief Functor which returns total scratch space required based on computed decompression_info + * data. + * */ -namespace GZIPHeaderFlag { -constexpr uint8_t ftext = 0x01; // ASCII text hint -constexpr uint8_t fhcrc = 0x02; // Header CRC present -constexpr uint8_t fextra = 0x04; // Extra fields present -constexpr uint8_t fname = 0x08; // Original file name present -constexpr uint8_t fcomment = 0x10; // Comment present -}; // namespace GZIPHeaderFlag +[[nodiscard]] size_t get_decompression_scratch_size(decompression_info const& di); } // namespace io::detail } // namespace CUDF_EXPORT cudf diff --git a/cpp/src/io/comp/nvcomp_adapter.cpp b/cpp/src/io/comp/nvcomp_adapter.cpp index 7c191b03350..f0647e1c080 100644 --- a/cpp/src/io/comp/nvcomp_adapter.cpp +++ b/cpp/src/io/comp/nvcomp_adapter.cpp @@ -337,6 +337,7 @@ size_t compress_max_output_chunk_size(compression_type compression, capped_uncomp_bytes, nvcompBatchedSnappyDefaultOpts, &max_comp_chunk_size); break; case compression_type::DEFLATE: + case compression_type::GZIP: // HACK! status = nvcompBatchedDeflateCompressGetMaxOutputChunkSize( capped_uncomp_bytes, nvcompBatchedDeflateDefaultOpts, &max_comp_chunk_size); break; @@ -495,7 +496,8 @@ size_t required_alignment(compression_type compression) std::optional compress_max_allowed_chunk_size(compression_type compression) { switch (compression) { - case compression_type::DEFLATE: return nvcompDeflateCompressionMaxAllowedChunkSize; + case compression_type::DEFLATE: + case compression_type::GZIP: return nvcompDeflateCompressionMaxAllowedChunkSize; case compression_type::SNAPPY: return nvcompSnappyCompressionMaxAllowedChunkSize; case compression_type::ZSTD: return nvcompZstdCompressionMaxAllowedChunkSize; case compression_type::LZ4: return nvcompLZ4CompressionMaxAllowedChunkSize; diff --git a/cpp/src/io/comp/uncomp.cpp b/cpp/src/io/comp/uncomp.cpp index 4ab5174387e..61c9b24cd6c 100644 --- a/cpp/src/io/comp/uncomp.cpp +++ b/cpp/src/io/comp/uncomp.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2024, NVIDIA CORPORATION. + * Copyright (c) 2018-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,11 +14,17 @@ * limitations under the License. */ +#include "common_internal.hpp" +#include "gpuinflate.hpp" +#include "io/utilities/getenv_or.hpp" #include "io/utilities/hostdevice_vector.hpp" #include "io_uncomp.hpp" #include "nvcomp_adapter.hpp" #include "unbz2.hpp" // bz2 uncompress +#include +#include +#include #include #include #include @@ -27,6 +33,8 @@ #include // uncompress #include // memset +#include +#include namespace cudf::io::detail { @@ -530,7 +538,7 @@ source_properties get_source_properties(compression_type compression, host_span< if (compression != compression_type::AUTO) break; [[fallthrough]]; } - default: CUDF_FAIL("Unsupported compressed stream type"); + default: CUDF_FAIL("Unsupported compressed stream type: " + compression_type_name(compression)); } return source_properties{compression, comp_data, comp_len, uncomp_len}; @@ -553,7 +561,7 @@ size_t decompress(compression_type compression, case compression_type::ZLIB: return decompress_zlib(src, dst); case compression_type::SNAPPY: return decompress_snappy(src, dst); case compression_type::ZSTD: return decompress_zstd(src, dst, stream); - default: CUDF_FAIL("Unsupported compression type"); + default: CUDF_FAIL("Unsupported compression type: " + compression_type_name(compression)); } } @@ -616,4 +624,156 @@ std::vector decompress(compression_type compression, host_span const> inputs, + device_span const> outputs, + device_span results, + size_t max_uncomp_chunk_size, + size_t max_total_uncomp_size, + rmm::cuda_stream_view stream) +{ + if (compression == compression_type::NONE) { return; } + + auto const nvcomp_type = to_nvcomp_compression(compression); + auto nvcomp_disabled = nvcomp_type.has_value() ? nvcomp::is_decompression_disabled(*nvcomp_type) + : "invalid compression type"; + if (not nvcomp_disabled) { + return nvcomp::batched_decompress( + *nvcomp_type, inputs, outputs, results, max_uncomp_chunk_size, max_total_uncomp_size, stream); + } + + switch (compression) { + case compression_type::BROTLI: return gpu_debrotli(inputs, outputs, results, stream); + case compression_type::GZIP: + return gpuinflate(inputs, outputs, results, gzip_header_included::YES, stream); + case compression_type::SNAPPY: return gpu_unsnap(inputs, outputs, results, stream); + case compression_type::ZLIB: + return gpuinflate(inputs, outputs, results, gzip_header_included::NO, stream); + default: CUDF_FAIL("Compression error: " + nvcomp_disabled.value()); + } +} + +void host_decompress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + rmm::cuda_stream_view stream) +{ + if (compression == compression_type::NONE) { return; } + + auto const num_chunks = inputs.size(); + auto h_results = cudf::detail::make_host_vector(num_chunks, stream); + auto const h_inputs = cudf::detail::make_host_vector_async(inputs, stream); + auto const h_outputs = cudf::detail::make_host_vector_async(outputs, stream); + stream.synchronize(); + + // Generate order vector to submit largest tasks first + std::vector task_order(num_chunks); + std::iota(task_order.begin(), task_order.end(), 0); + std::sort(task_order.begin(), task_order.end(), [&](size_t a, size_t b) { + return h_inputs[a].size() > h_inputs[b].size(); + }); + + std::vector> tasks; + auto const num_streams = + std::min({num_chunks, + cudf::detail::global_cuda_stream_pool().get_stream_pool_size(), + cudf::detail::host_worker_pool().get_thread_count()}); + auto const streams = cudf::detail::fork_streams(stream, num_streams); + for (size_t i = 0; i < num_chunks; ++i) { + auto const idx = task_order[i]; + auto const cur_stream = streams[i % streams.size()]; + auto task = + [d_in = h_inputs[idx], d_out = h_outputs[idx], cur_stream, compression]() -> size_t { + auto h_in = cudf::detail::make_pinned_vector_async(d_in.size(), cur_stream); + cudf::detail::cuda_memcpy(h_in, d_in, cur_stream); + auto h_out = cudf::detail::make_pinned_vector_async(d_out.size(), cur_stream); + auto const uncomp_size = decompress(compression, h_in, h_out, cur_stream); + cudf::detail::cuda_memcpy(d_out.subspan(0, uncomp_size), + host_span{h_out}.subspan(0, uncomp_size), + cur_stream); + return uncomp_size; + }; + tasks.emplace_back(cudf::detail::host_worker_pool().submit_task(std::move(task))); + } + + for (auto i = 0ul; i < num_chunks; ++i) { + h_results[task_order[i]] = {tasks[i].get(), compression_status::SUCCESS}; + } + cudf::detail::cuda_memcpy_async(results, h_results, stream); +} + +[[nodiscard]] bool host_decompression_supported(compression_type compression) +{ + switch (compression) { + case compression_type::GZIP: + case compression_type::SNAPPY: + case compression_type::ZLIB: + case compression_type::ZSTD: + case compression_type::NONE: return true; + default: return false; + } +} + +[[nodiscard]] bool device_decompression_supported(compression_type compression) +{ + auto const nvcomp_type = to_nvcomp_compression(compression); + switch (compression) { + case compression_type::ZSTD: return not nvcomp::is_decompression_disabled(nvcomp_type.value()); + case compression_type::BROTLI: + case compression_type::GZIP: + case compression_type::LZ4: + case compression_type::SNAPPY: + case compression_type::ZLIB: + case compression_type::NONE: return true; + default: return false; + } +} + +[[nodiscard]] bool use_host_decompression(compression_type compression) +{ + CUDF_EXPECTS( + host_decompression_supported(compression) or device_decompression_supported(compression), + "Unsupported compression type: " + compression_type_name(compression)); + if (not host_decompression_supported(compression)) { return false; } + if (not device_decompression_supported(compression)) { return true; } + // If both host and device compression are supported, use the host if the env var is set + return getenv_or("LIBCUDF_HOST_DECOMPRESSION", std::string{"OFF"}) == "ON"; +} + +[[nodiscard]] size_t get_decompression_scratch_size(decompression_info const& di) +{ + if (di.type == compression_type::NONE or use_host_decompression(di.type)) { return 0; } + + auto const nvcomp_type = to_nvcomp_compression(di.type); + auto nvcomp_disabled = nvcomp_type.has_value() ? nvcomp::is_decompression_disabled(*nvcomp_type) + : "invalid compression type"; + if (not nvcomp_disabled) { + nvcomp::batched_decompress_temp_size( + nvcomp_type.value(), di.num_pages, di.max_page_decompressed_size, di.total_decompressed_size); + } + + if (di.type == compression_type::BROTLI) return get_gpu_debrotli_scratch_size(di.num_pages); + // only Brotli kernel requires scratch memory + return 0; +} + +void decompress(compression_type compression, + device_span const> inputs, + device_span const> outputs, + device_span results, + size_t max_uncomp_chunk_size, + size_t max_total_uncomp_size, + rmm::cuda_stream_view stream) +{ + CUDF_FUNC_RANGE(); + if (inputs.empty()) { return; } + if (use_host_decompression(compression)) { + return host_decompress(compression, inputs, outputs, results, stream); + } else { + return device_decompress( + compression, inputs, outputs, results, max_uncomp_chunk_size, max_total_uncomp_size, stream); + } +} + } // namespace cudf::io::detail diff --git a/cpp/src/io/orc/reader_impl_decode.cu b/cpp/src/io/orc/reader_impl_decode.cu index 5ad28de1e8e..be258935a36 100644 --- a/cpp/src/io/orc/reader_impl_decode.cu +++ b/cpp/src/io/orc/reader_impl_decode.cu @@ -15,7 +15,7 @@ */ #include "io/comp/gpuinflate.hpp" -#include "io/comp/nvcomp_adapter.hpp" +#include "io/comp/io_uncomp.hpp" #include "io/orc/reader_impl.hpp" #include "io/orc/reader_impl_chunking.hpp" #include "io/orc/reader_impl_helpers.hpp" @@ -190,79 +190,27 @@ rmm::device_buffer decompress_stripe_data( any_block_failure[0] = false; any_block_failure.host_to_device_async(stream); - // Dispatch batches of blocks to decompress - if (num_compressed_blocks > 0) { - device_span> inflate_in_view{inflate_in.data(), - num_compressed_blocks}; - device_span> inflate_out_view{inflate_out.data(), num_compressed_blocks}; - switch (decompressor.compression()) { - case compression_type::ZLIB: - if (nvcomp::is_decompression_disabled(nvcomp::compression_type::DEFLATE)) { - gpuinflate( - inflate_in_view, inflate_out_view, inflate_res, gzip_header_included::NO, stream); - } else { - nvcomp::batched_decompress(nvcomp::compression_type::DEFLATE, - inflate_in_view, - inflate_out_view, - inflate_res, - max_uncomp_block_size, - total_decomp_size, - stream); - } - break; - case compression_type::SNAPPY: - if (nvcomp::is_decompression_disabled(nvcomp::compression_type::SNAPPY)) { - gpu_unsnap(inflate_in_view, inflate_out_view, inflate_res, stream); - } else { - nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, - inflate_in_view, - inflate_out_view, - inflate_res, - max_uncomp_block_size, - total_decomp_size, - stream); - } - break; - case compression_type::ZSTD: - if (auto const reason = nvcomp::is_decompression_disabled(nvcomp::compression_type::ZSTD); - reason) { - CUDF_FAIL("Decompression error: " + reason.value()); - } - nvcomp::batched_decompress(nvcomp::compression_type::ZSTD, - inflate_in_view, - inflate_out_view, - inflate_res, - max_uncomp_block_size, - total_decomp_size, - stream); - break; - case compression_type::LZ4: - if (auto const reason = nvcomp::is_decompression_disabled(nvcomp::compression_type::LZ4); - reason) { - CUDF_FAIL("Decompression error: " + reason.value()); - } - nvcomp::batched_decompress(nvcomp::compression_type::LZ4, - inflate_in_view, - inflate_out_view, - inflate_res, - max_uncomp_block_size, - total_decomp_size, - stream); - break; - default: CUDF_FAIL("Unexpected decompression dispatch"); break; - } + device_span> inflate_in_view{inflate_in.data(), num_compressed_blocks}; + device_span> inflate_out_view{inflate_out.data(), num_compressed_blocks}; + cudf::io::detail::decompress(decompressor.compression(), + inflate_in_view, + inflate_out_view, + inflate_res, + max_uncomp_block_size, + total_decomp_size, + stream); - // Check if any block has been failed to decompress. - // Not using `thrust::any` or `thrust::count_if` to defer stream sync. - thrust::for_each( - rmm::exec_policy_nosync(stream), - thrust::make_counting_iterator(std::size_t{0}), - thrust::make_counting_iterator(inflate_res.size()), - [results = inflate_res.begin(), - any_block_failure = any_block_failure.device_ptr()] __device__(auto const idx) { - if (results[idx].status != compression_status::SUCCESS) { *any_block_failure = true; } - }); - } + // Check if any block has been failed to decompress. + // Not using `thrust::any` or `thrust::count_if` to defer stream sync. + thrust::for_each(rmm::exec_policy_nosync(stream), + thrust::make_counting_iterator(std::size_t{0}), + thrust::make_counting_iterator(inflate_res.size()), + [results = inflate_res.begin(), + any_block_failure = any_block_failure.device_ptr()] __device__(auto const idx) { + if (results[idx].status != compression_status::SUCCESS) { + *any_block_failure = true; + } + }); if (num_uncompressed_blocks > 0) { device_span> copy_in_view{inflate_in.data() + num_compressed_blocks, diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 5242b18b574..efa4f516dd8 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -15,9 +15,8 @@ */ #include "compact_protocol_reader.hpp" -#include "io/comp/comp.hpp" #include "io/comp/gpuinflate.hpp" -#include "io/comp/nvcomp_adapter.hpp" +#include "io/comp/io_uncomp.hpp" #include "io/utilities/time_utils.cuh" #include "reader_impl.hpp" #include "reader_impl_chunking.hpp" @@ -47,9 +46,9 @@ namespace cudf::io::parquet::detail { namespace { -namespace nvcomp = cudf::io::detail::nvcomp; using cudf::io::detail::compression_result; using cudf::io::detail::compression_status; +using cudf::io::detail::decompression_info; struct split_info { row_range rows; @@ -156,6 +155,19 @@ void print_cumulative_row_info(host_span sizes, } #endif // CHUNKING_DEBUG +compression_type from_parquet_compression(Compression compression) +{ + switch (compression) { + case Compression::BROTLI: return compression_type::BROTLI; + case Compression::GZIP: return compression_type::GZIP; + case Compression::LZ4_RAW: return compression_type::LZ4; + case Compression::SNAPPY: return compression_type::SNAPPY; + case Compression::ZSTD: return compression_type::ZSTD; + case Compression::UNCOMPRESSED: return compression_type::NONE; + default: CUDF_FAIL("Unsupported compression type"); + } +} + /** * @brief Functor which reduces two cumulative_page_info structs of the same key. */ @@ -759,9 +771,6 @@ std::vector compute_page_splits_by_row(device_span compute_page_splits_by_row(device_span 0) { - debrotli_scratch.resize(cudf::io::detail::get_gpu_debrotli_scratch_size(codec.num_pages), - stream); - } } // Dispatch batches of pages to decompress for each codec. @@ -866,71 +871,17 @@ std::vector compute_page_splits_by_row(device_span const> d_comp_in_view{d_comp_in.data() + start_pos, codec.num_pages}; - device_span const> d_comp_out_view(d_comp_out.data() + start_pos, codec.num_pages); - device_span d_comp_res_view(comp_res.data() + start_pos, codec.num_pages); + cudf::io::detail::decompress(from_parquet_compression(codec.compression_type), + d_comp_in_view, + d_comp_out_view, + d_comp_res_view, + codec.max_decompressed_size, + codec.total_decomp_size, + stream); - switch (codec.compression_type) { - case GZIP: - if (cudf::io::nvcomp_integration::is_all_enabled()) { - nvcomp::batched_decompress(nvcomp::compression_type::GZIP, - d_comp_in_view, - d_comp_out_view, - d_comp_res_view, - codec.max_decompressed_size, - codec.total_decomp_size, - stream); - } else { - gpuinflate(d_comp_in_view, - d_comp_out_view, - d_comp_res_view, - cudf::io::detail::gzip_header_included::YES, - stream); - } - break; - case SNAPPY: - if (cudf::io::nvcomp_integration::is_stable_enabled()) { - nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY, - d_comp_in_view, - d_comp_out_view, - d_comp_res_view, - codec.max_decompressed_size, - codec.total_decomp_size, - stream); - } else { - gpu_unsnap(d_comp_in_view, d_comp_out, d_comp_res_view, stream); - } - break; - case ZSTD: - nvcomp::batched_decompress(nvcomp::compression_type::ZSTD, - d_comp_in_view, - d_comp_out_view, - d_comp_res_view, - codec.max_decompressed_size, - codec.total_decomp_size, - stream); - break; - case BROTLI: - gpu_debrotli(d_comp_in_view, - d_comp_out_view, - d_comp_res_view, - debrotli_scratch.data(), - debrotli_scratch.size(), - stream); - break; - case LZ4_RAW: - nvcomp::batched_decompress(nvcomp::compression_type::LZ4, - d_comp_in_view, - d_comp_out_view, - d_comp_res_view, - codec.max_decompressed_size, - codec.total_decomp_size, - stream); - break; - default: CUDF_FAIL("Unexpected decompression dispatch"); break; - } start_pos += codec.num_pages; } @@ -1047,13 +998,6 @@ void detect_malformed_pages(device_span pages, } } -struct decompression_info { - Compression codec; - size_t num_pages; - size_t max_page_decompressed_size; - size_t total_decompressed_size; -}; - /** * @brief Functor which retrieves per-page decompression information. * @@ -1063,7 +1007,17 @@ struct get_decomp_info { __device__ decompression_info operator()(PageInfo const& p) const { - return {static_cast(chunks[p.chunk_idx].codec), + auto const comp_type = [codec = chunks[p.chunk_idx].codec]() { + switch (codec) { + case SNAPPY: return compression_type::SNAPPY; + case GZIP: return compression_type::GZIP; + case BROTLI: return compression_type::BROTLI; + case ZSTD: return compression_type::ZSTD; + case LZ4_RAW: return compression_type::LZ4; + default: return compression_type::NONE; + } + }(); + return {comp_type, 1, static_cast(p.uncompressed_page_size), static_cast(p.uncompressed_page_size)}; @@ -1078,54 +1032,13 @@ struct decomp_sum { __device__ decompression_info operator()(decompression_info const& a, decompression_info const& b) const { - return {a.codec, + return {a.type, a.num_pages + b.num_pages, cuda::std::max(a.max_page_decompressed_size, b.max_page_decompressed_size), a.total_decompressed_size + b.total_decompressed_size}; } }; -/** - * @brief Functor which returns total scratch space required based on computed decompression_info - * data. - * - */ -struct get_decomp_scratch { - size_t operator()(decompression_info const& di) const - { - switch (di.codec) { - case UNCOMPRESSED: - case GZIP: return 0; - - case BROTLI: return cudf::io::detail::get_gpu_debrotli_scratch_size(di.num_pages); - - case SNAPPY: - if (cudf::io::nvcomp_integration::is_stable_enabled()) { - return nvcomp::batched_decompress_temp_size(nvcomp::compression_type::SNAPPY, - di.num_pages, - di.max_page_decompressed_size, - di.total_decompressed_size); - } else { - return 0; - } - break; - - case ZSTD: - return nvcomp::batched_decompress_temp_size(nvcomp::compression_type::ZSTD, - di.num_pages, - di.max_page_decompressed_size, - di.total_decompressed_size); - case LZ4_RAW: - return nvcomp::batched_decompress_temp_size(nvcomp::compression_type::LZ4, - di.num_pages, - di.max_page_decompressed_size, - di.total_decompressed_size); - - default: CUDF_FAIL("Invalid compression codec for parquet decompression"); - } - } -}; - /** * @brief Add the cost of decompression codec scratch space to the per-page cumulative * size information. @@ -1152,14 +1065,14 @@ void include_decompression_scratch_size(device_span chunk cuda::std::equal_to{}, decomp_sum{}); - // retrieve to host so we can call nvcomp to get compression scratch sizes + // retrieve to host so we can get compression scratch sizes auto h_decomp_info = cudf::detail::make_host_vector_sync(decomp_info, stream); auto temp_cost = cudf::detail::make_host_vector(pages.size(), stream); thrust::transform(thrust::host, h_decomp_info.begin(), h_decomp_info.end(), temp_cost.begin(), - get_decomp_scratch{}); + cudf::io::detail::get_decompression_scratch_size); // add to the cumulative_page_info data rmm::device_uvector d_temp_cost = cudf::detail::make_device_uvector_async( diff --git a/cpp/tests/io/comp/comp_test.cpp b/cpp/tests/io/comp/comp_test.cpp index e3bee708485..736ec579ec5 100644 --- a/cpp/tests/io/comp/comp_test.cpp +++ b/cpp/tests/io/comp/comp_test.cpp @@ -14,6 +14,7 @@ * limitations under the License. */ +#include "cudf/io/types.hpp" #include "io/comp/comp.hpp" #include "io/comp/gpuinflate.hpp" #include "io/comp/io_uncomp.hpp" @@ -78,9 +79,32 @@ struct DecompressTest : public cudf::test::BaseFixture { } }; -struct HostCompressTest : public cudf::test::BaseFixture { - HostCompressTest() { setenv("LIBCUDF_HOST_COMPRESSION", "ON", 1); } - ~HostCompressTest() override { unsetenv("LIBCUDF_HOST_COMPRESSION"); } +struct HostCompressTest : public cudf::test::BaseFixture, + public ::testing::WithParamInterface { + HostCompressTest() + { + setenv("LIBCUDF_HOST_COMPRESSION", "ON", 1); + setenv("LIBCUDF_NVCOMP_POLICY", "ALWAYS", 1); + } + ~HostCompressTest() override + { + unsetenv("LIBCUDF_HOST_COMPRESSION"); + unsetenv("LIBCUDF_NVCOMP_POLICY"); + } +}; + +struct HostDecompressTest : public cudf::test::BaseFixture, + public ::testing::WithParamInterface { + HostDecompressTest() + { + setenv("LIBCUDF_HOST_DECOMPRESSION", "ON", 1); + setenv("LIBCUDF_NVCOMP_POLICY", "ALWAYS", 1); + } + ~HostDecompressTest() override + { + unsetenv("LIBCUDF_HOST_DECOMPRESSION"); + unsetenv("LIBCUDF_NVCOMP_POLICY"); + } }; /** @@ -122,12 +146,7 @@ struct BrotliDecompressTest : public DecompressTest { rmm::device_buffer d_scratch{cudf::io::detail::get_gpu_debrotli_scratch_size(1), cudf::get_default_stream()}; - cudf::io::detail::gpu_debrotli(d_inf_in, - d_inf_out, - d_inf_stat, - d_scratch.data(), - d_scratch.size(), - cudf::get_default_stream()); + cudf::io::detail::gpu_debrotli(d_inf_in, d_inf_out, d_inf_stat, cudf::get_default_stream()); } }; @@ -229,23 +248,87 @@ TEST_F(NvcompConfigTest, Decompression) EXPECT_TRUE(decomp_disabled(compression_type::SNAPPY, {false, false})); } -TEST_F(HostCompressTest, SnappyCompression) +void roundtip_test(cudf::io::compression_type compression) { + auto const stream = cudf::get_default_stream(); + auto const mr = rmm::mr::get_current_device_resource(); std::vector expected; - expected.reserve(8 * (32 << 20)); - for (size_t size = 1; size < 32 << 20; size *= 2) { + expected.reserve(8 * (8 << 20)); + for (size_t size = 1; size < 8 << 20; size *= 2) { // Using number strings to generate data that is compressible, but not trivially so for (size_t i = size / 2; i < size; ++i) { auto const num_string = std::to_string(i); // Keep adding to the test data expected.insert(expected.end(), num_string.begin(), num_string.end()); } - auto const compressed = cudf::io::detail::compress( - cudf::io::compression_type::SNAPPY, expected, cudf::get_default_stream()); - auto const decompressed = - cudf::io::detail::decompress(cudf::io::compression_type::SNAPPY, compressed); - EXPECT_EQ(expected, decompressed); + if (cudf::io::detail::compress_max_allowed_chunk_size(compression) + .value_or(std::numeric_limits::max()) < expected.size()) { + // Skip if the data is too large for the compressor + return; + } + + auto d_comp = rmm::device_uvector( + cudf::io::detail::max_compressed_size(compression, expected.size()), stream, mr); + { + auto const d_orig = cudf::detail::make_device_uvector_async(expected, stream, mr); + auto hd_srcs = cudf::detail::hostdevice_vector>(1, stream); + hd_srcs[0] = d_orig; + hd_srcs.host_to_device_async(stream); + + auto hd_dsts = cudf::detail::hostdevice_vector>(1, stream); + hd_dsts[0] = d_comp; + hd_dsts.host_to_device_async(stream); + + auto hd_stats = cudf::detail::hostdevice_vector(1, stream); + hd_stats[0] = compression_result{0, compression_status::FAILURE}; + hd_stats.host_to_device_async(stream); + + cudf::io::detail::compress(compression, hd_srcs, hd_dsts, hd_stats, stream); + hd_stats.device_to_host_sync(stream); + ASSERT_EQ(hd_stats[0].status, compression_status::SUCCESS); + d_comp.resize(hd_stats[0].bytes_written, stream); + } + + auto d_got = cudf::detail::hostdevice_vector(expected.size(), stream); + { + auto hd_srcs = cudf::detail::hostdevice_vector>(1, stream); + hd_srcs[0] = d_comp; + hd_srcs.host_to_device_async(stream); + + auto hd_dsts = cudf::detail::hostdevice_vector>(1, stream); + hd_dsts[0] = d_got; + hd_dsts.host_to_device_async(stream); + + auto hd_stats = cudf::detail::hostdevice_vector(1, stream); + hd_stats[0] = compression_result{0, compression_status::FAILURE}; + hd_stats.host_to_device_async(stream); + + cudf::io::detail::decompress( + compression, hd_srcs, hd_dsts, hd_stats, expected.size(), expected.size(), stream); + hd_stats.device_to_host_sync(stream); + ASSERT_EQ(hd_stats[0].status, compression_status::SUCCESS); + } + + auto const got = cudf::detail::make_std_vector_sync(d_got, stream); + + EXPECT_EQ(expected, got); } } +TEST_P(HostCompressTest, HostCompression) { roundtip_test(GetParam()); } + +INSTANTIATE_TEST_CASE_P(HostCompression, + HostCompressTest, + ::testing::Values(cudf::io::compression_type::GZIP, + cudf::io::compression_type::SNAPPY)); + +TEST_P(HostDecompressTest, HostDecompression) { roundtip_test(GetParam()); } + +INSTANTIATE_TEST_CASE_P(HostDecompression, + HostDecompressTest, + ::testing::Values(cudf::io::compression_type::GZIP, + cudf::io::compression_type::SNAPPY, + cudf::io::compression_type::ZLIB, + cudf::io::compression_type::ZSTD)); + CUDF_TEST_PROGRAM_MAIN()