diff --git a/cpp/src/parquet/CMakeLists.txt b/cpp/src/parquet/CMakeLists.txt index 6c1550dcc2f..df4c48c0c41 100644 --- a/cpp/src/parquet/CMakeLists.txt +++ b/cpp/src/parquet/CMakeLists.txt @@ -415,6 +415,7 @@ add_parquet_test(arrow-metadata-test SOURCES arrow/arrow_metadata_test.cc if(PARQUET_REQUIRE_ENCRYPTION) add_parquet_test(encryption-test SOURCES + encryption/bloom_filter_encryption_test.cc encryption/encryption_internal_test.cc encryption/write_configurations_test.cc encryption/read_configurations_test.cc diff --git a/cpp/src/parquet/arrow/fuzz_internal.cc b/cpp/src/parquet/arrow/fuzz_internal.cc index 8618a85fcca..4f457c25b82 100644 --- a/cpp/src/parquet/arrow/fuzz_internal.cc +++ b/cpp/src/parquet/arrow/fuzz_internal.cc @@ -179,6 +179,23 @@ Status FuzzReadPageIndex(RowGroupPageIndexReader* reader, const SchemaDescriptor return st; } +Status FuzzReadBloomFilter(RowGroupBloomFilterReader* reader, int column, + std::uniform_int_distribution& hash_dist, + std::default_random_engine& rng) { + Status st; + BEGIN_PARQUET_CATCH_EXCEPTIONS + std::unique_ptr bloom; + bloom = reader->GetColumnBloomFilter(column); + // If the column has a bloom filter, find a bunch of random hashes + if (bloom != nullptr) { + for (int k = 0; k < 100; ++k) { + bloom->FindHash(hash_dist(rng)); + } + } + END_PARQUET_CATCH_EXCEPTIONS + return st; +} + ReaderProperties MakeFuzzReaderProperties(MemoryPool* pool) { FileDecryptionProperties::Builder builder; builder.key_retriever(MakeKeyRetriever()); @@ -231,31 +248,12 @@ Status FuzzReader(const uint8_t* data, int64_t size) { } { // Read and decode bloom filters - try { - auto& bloom_reader = pq_file_reader->GetBloomFilterReader(); - std::uniform_int_distribution hash_dist; - for (int i = 0; i < num_row_groups; ++i) { - auto bloom_rg = bloom_reader.RowGroup(i); - for (int j = 0; j < num_columns; ++j) { - std::unique_ptr bloom; - bloom = bloom_rg->GetColumnBloomFilter(j); - // If the column has a bloom filter, find a bunch of random hashes - if (bloom != nullptr) { - for (int k = 0; k < 100; ++k) { - bloom->FindHash(hash_dist(rng)); - } - } - } - } - } catch (const ParquetException& exc) { - // XXX we just want to ignore encrypted bloom filters and validate the - // rest of the file; there is no better way of doing this until GH-46597 - // is done. - // (also see GH-48334 for reading encrypted bloom filters) - if (std::string_view(exc.what()) - .find("BloomFilter decryption is not yet supported") == - std::string_view::npos) { - throw; + auto& bloom_reader = pq_file_reader->GetBloomFilterReader(); + std::uniform_int_distribution hash_dist; + for (int i = 0; i < num_row_groups; ++i) { + auto bloom_rg = bloom_reader.RowGroup(i); + for (int j = 0; j < num_columns; ++j) { + st &= FuzzReadBloomFilter(bloom_rg.get(), j, hash_dist, rng); } } } diff --git a/cpp/src/parquet/bloom_filter.cc b/cpp/src/parquet/bloom_filter.cc index e8011b5fc80..24d6da55a37 100644 --- a/cpp/src/parquet/bloom_filter.cc +++ b/cpp/src/parquet/bloom_filter.cc @@ -17,6 +17,7 @@ #include #include +#include #include #include "arrow/result.h" @@ -26,11 +27,28 @@ #include "generated/parquet_types.h" #include "parquet/bloom_filter.h" +#include "parquet/encryption/internal_file_decryptor.h" #include "parquet/exception.h" #include "parquet/thrift_internal.h" #include "parquet/xxhasher.h" namespace parquet { +namespace { + +constexpr int32_t kCiphertextLengthSize = 4; + +int64_t ParseTotalCiphertextSize(const uint8_t* data, int64_t length) { + if (length < kCiphertextLengthSize) { + throw ParquetException("Ciphertext length buffer is too small"); + } + uint32_t buffer_size = + (static_cast(data[3]) << 24) | (static_cast(data[2]) << 16) | + (static_cast(data[1]) << 8) | (static_cast(data[0])); + return static_cast(buffer_size) + kCiphertextLengthSize; +} + +} // namespace + constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock]; BlockSplitBloomFilter::BlockSplitBloomFilter(::arrow::MemoryPool* pool) @@ -106,7 +124,99 @@ static ::arrow::Status ValidateBloomFilterHeader( BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize( const ReaderProperties& properties, ArrowInputStream* input, - std::optional bloom_filter_length) { + std::optional bloom_filter_length, Decryptor* header_decryptor, + Decryptor* bitset_decryptor) { + if (header_decryptor != nullptr || bitset_decryptor != nullptr) { + if (header_decryptor == nullptr || bitset_decryptor == nullptr) { + throw ParquetException("Bloom filter decryptors must be both provided"); + } + + // Encrypted path: header and bitset are encrypted separately. + ThriftDeserializer deserializer(properties); + format::BloomFilterHeader header; + + // Read the length-prefixed ciphertext for the header. + PARQUET_ASSIGN_OR_THROW(auto length_buf, input->Read(kCiphertextLengthSize)); + if (ARROW_PREDICT_FALSE(length_buf->size() < kCiphertextLengthSize)) { + throw ParquetException("Bloom filter header read failed: not enough data"); + } + + const int64_t header_cipher_total_len = + ParseTotalCiphertextSize(length_buf->data(), length_buf->size()); + if (ARROW_PREDICT_FALSE(header_cipher_total_len > + std::numeric_limits::max())) { + throw ParquetException("Bloom filter header ciphertext length overflows int32"); + } + if (bloom_filter_length && header_cipher_total_len > *bloom_filter_length) { + throw ParquetException( + "Bloom filter length less than encrypted bloom filter header length"); + } + // Read the full header ciphertext and decrypt the Thrift header. + auto header_cipher_buf = + AllocateBuffer(properties.memory_pool(), header_cipher_total_len); + std::memcpy(header_cipher_buf->mutable_data(), length_buf->data(), + kCiphertextLengthSize); + const int64_t header_cipher_remaining = + header_cipher_total_len - kCiphertextLengthSize; + PARQUET_ASSIGN_OR_THROW( + auto read_size, + input->Read(header_cipher_remaining, + header_cipher_buf->mutable_data() + kCiphertextLengthSize)); + if (ARROW_PREDICT_FALSE(read_size < header_cipher_remaining)) { + throw ParquetException("Bloom filter header read failed: not enough data"); + } + + uint32_t header_cipher_len = static_cast(header_cipher_total_len); + try { + deserializer.DeserializeMessage( + reinterpret_cast(header_cipher_buf->data()), &header_cipher_len, + &header, header_decryptor); + DCHECK_EQ(header_cipher_len, header_cipher_total_len); + } catch (std::exception& e) { + std::stringstream ss; + ss << "Deserializing bloom filter header failed.\n" << e.what(); + throw ParquetException(ss.str()); + } + PARQUET_THROW_NOT_OK(ValidateBloomFilterHeader(header)); + + const int32_t bloom_filter_size = header.numBytes; + const int32_t bitset_cipher_len = + bitset_decryptor->CiphertextLength(bloom_filter_size); + const int64_t total_cipher_len = + header_cipher_total_len + static_cast(bitset_cipher_len); + if (bloom_filter_length && *bloom_filter_length != total_cipher_len) { + std::stringstream ss; + ss << "Bloom filter length (" << bloom_filter_length.value() + << ") does not match the actual bloom filter (size: " << total_cipher_len + << ")."; + throw ParquetException(ss.str()); + } + + // Read and decrypt the bitset bytes. + PARQUET_ASSIGN_OR_THROW(auto bitset_cipher_buf, input->Read(bitset_cipher_len)); + if (ARROW_PREDICT_FALSE(bitset_cipher_buf->size() < bitset_cipher_len)) { + throw ParquetException("Bloom Filter read failed: not enough data"); + } + + const int32_t bitset_plain_len = + bitset_decryptor->PlaintextLength(static_cast(bitset_cipher_len)); + if (ARROW_PREDICT_FALSE(bitset_plain_len != bloom_filter_size)) { + throw ParquetException("Bloom filter bitset size does not match header"); + } + + auto bitset_plain_buf = AllocateBuffer(properties.memory_pool(), bitset_plain_len); + int32_t decrypted_len = + bitset_decryptor->Decrypt(bitset_cipher_buf->span_as(), + bitset_plain_buf->mutable_span_as()); + if (ARROW_PREDICT_FALSE(decrypted_len != bitset_plain_len)) { + throw ParquetException("Bloom filter bitset decryption failed"); + } + + // Initialize the bloom filter from the decrypted bitset. + BlockSplitBloomFilter bloom_filter(properties.memory_pool()); + bloom_filter.Init(bitset_plain_buf->data(), bloom_filter_size); + return bloom_filter; + } ThriftDeserializer deserializer(properties); format::BloomFilterHeader header; int64_t bloom_filter_header_read_size = 0; diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h index f1a74e4f5e4..a856c5d01a4 100644 --- a/cpp/src/parquet/bloom_filter.h +++ b/cpp/src/parquet/bloom_filter.h @@ -29,6 +29,8 @@ namespace parquet { +class Decryptor; + // A Bloom filter is a compact structure to indicate whether an item is not in a set or // probably in a set. The Bloom filter usually consists of a bit set that represents a // set of elements, a hash strategy and a Bloom filter algorithm. @@ -323,10 +325,15 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter { /// @param input_stream The input stream from which to construct the bloom filter. /// @param bloom_filter_length The length of the serialized bloom filter including /// header. + /// @param header_decryptor Optional decryptor for the serialized header. + /// @param bitset_decryptor Optional decryptor for the bitset bytes. + /// Both decryptors must be provided for encrypted bloom filters, or both be null + /// for unencrypted. Providing only one will throw ParquetException. /// @return The BlockSplitBloomFilter. static BlockSplitBloomFilter Deserialize( const ReaderProperties& properties, ArrowInputStream* input_stream, - std::optional bloom_filter_length = std::nullopt); + std::optional bloom_filter_length = std::nullopt, + Decryptor* header_decryptor = NULLPTR, Decryptor* bitset_decryptor = NULLPTR); private: inline void InsertHashImpl(uint64_t hash); diff --git a/cpp/src/parquet/bloom_filter_reader.cc b/cpp/src/parquet/bloom_filter_reader.cc index 0b1bc556b4a..2fcdff532f6 100644 --- a/cpp/src/parquet/bloom_filter_reader.cc +++ b/cpp/src/parquet/bloom_filter_reader.cc @@ -17,6 +17,8 @@ #include "parquet/bloom_filter_reader.h" #include "parquet/bloom_filter.h" +#include "parquet/encryption/encryption_internal.h" +#include "parquet/encryption/internal_file_decryptor.h" #include "parquet/exception.h" #include "parquet/metadata.h" @@ -26,10 +28,14 @@ class RowGroupBloomFilterReaderImpl final : public RowGroupBloomFilterReader { public: RowGroupBloomFilterReaderImpl(std::shared_ptr<::arrow::io::RandomAccessFile> input, std::shared_ptr row_group_metadata, - const ReaderProperties& properties) + const ReaderProperties& properties, + int32_t row_group_ordinal, + std::shared_ptr file_decryptor) : input_(std::move(input)), row_group_metadata_(std::move(row_group_metadata)), - properties_(properties) {} + properties_(properties), + row_group_ordinal_(row_group_ordinal), + file_decryptor_(std::move(file_decryptor)) {} std::unique_ptr GetColumnBloomFilter(int i) override; @@ -42,6 +48,12 @@ class RowGroupBloomFilterReaderImpl final : public RowGroupBloomFilterReader { /// Reader properties used to deserialize thrift object. const ReaderProperties& properties_; + + /// The ordinal of the row group in the file. + int32_t row_group_ordinal_; + + /// File-level decryptor. + std::shared_ptr file_decryptor_; }; std::unique_ptr RowGroupBloomFilterReaderImpl::GetColumnBloomFilter(int i) { @@ -50,11 +62,6 @@ std::unique_ptr RowGroupBloomFilterReaderImpl::GetColumnBloomFilter } auto col_chunk = row_group_metadata_->ColumnChunk(i); - std::unique_ptr crypto_metadata = col_chunk->crypto_metadata(); - if (crypto_metadata != nullptr) { - ParquetException::NYI("BloomFilter decryption is not yet supported"); - } - auto bloom_filter_offset = col_chunk->bloom_filter_offset(); if (!bloom_filter_offset.has_value()) { return nullptr; @@ -76,10 +83,28 @@ std::unique_ptr RowGroupBloomFilterReaderImpl::GetColumnBloomFilter "bloom filter length + bloom filter offset greater than file size"); } } - auto stream = ::arrow::io::RandomAccessFile::GetStream( - input_, *bloom_filter_offset, file_size - *bloom_filter_offset); + std::unique_ptr crypto_metadata = col_chunk->crypto_metadata(); + std::unique_ptr header_decryptor = + InternalFileDecryptor::GetColumnMetaDecryptorFactory(file_decryptor_.get(), + crypto_metadata.get())(); + std::unique_ptr bitset_decryptor = + InternalFileDecryptor::GetColumnDataDecryptorFactory(file_decryptor_.get(), + crypto_metadata.get())(); + if (header_decryptor != nullptr) { + UpdateDecryptor(header_decryptor.get(), row_group_ordinal_, static_cast(i), + encryption::kBloomFilterHeader); + } + if (bitset_decryptor != nullptr) { + UpdateDecryptor(bitset_decryptor.get(), row_group_ordinal_, static_cast(i), + encryption::kBloomFilterBitset); + } + const int64_t stream_length = + bloom_filter_length ? *bloom_filter_length : file_size - *bloom_filter_offset; + auto stream = ::arrow::io::RandomAccessFile::GetStream(input_, *bloom_filter_offset, + stream_length); auto bloom_filter = - BlockSplitBloomFilter::Deserialize(properties_, stream->get(), bloom_filter_length); + BlockSplitBloomFilter::Deserialize(properties_, stream->get(), bloom_filter_length, + header_decryptor.get(), bitset_decryptor.get()); return std::make_unique(std::move(bloom_filter)); } @@ -91,11 +116,8 @@ class BloomFilterReaderImpl final : public BloomFilterReader { std::shared_ptr file_decryptor) : input_(std::move(input)), file_metadata_(std::move(file_metadata)), - properties_(properties) { - if (file_decryptor != nullptr) { - ParquetException::NYI("BloomFilter decryption is not yet supported"); - } - } + properties_(properties), + file_decryptor_(std::move(file_decryptor)) {} std::shared_ptr RowGroup(int i) { if (i < 0 || i >= file_metadata_->num_row_groups()) { @@ -104,7 +126,7 @@ class BloomFilterReaderImpl final : public BloomFilterReader { auto row_group_metadata = file_metadata_->RowGroup(i); return std::make_shared( - input_, std::move(row_group_metadata), properties_); + input_, std::move(row_group_metadata), properties_, i, file_decryptor_); } private: @@ -116,6 +138,9 @@ class BloomFilterReaderImpl final : public BloomFilterReader { /// Reader properties used to deserialize thrift object. const ReaderProperties& properties_; + + /// File-level decryptor, if any. + std::shared_ptr file_decryptor_; }; std::unique_ptr BloomFilterReader::Make( diff --git a/cpp/src/parquet/encryption/bloom_filter_encryption_test.cc b/cpp/src/parquet/encryption/bloom_filter_encryption_test.cc new file mode 100644 index 00000000000..9a49e7277b6 --- /dev/null +++ b/cpp/src/parquet/encryption/bloom_filter_encryption_test.cc @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include + +#include "arrow/io/file.h" + +#include "parquet/bloom_filter.h" +#include "parquet/bloom_filter_reader.h" +#include "parquet/encryption/test_encryption_util.h" +#include "parquet/file_reader.h" +#include "parquet/properties.h" + +namespace parquet::encryption::test { +namespace { + +std::shared_ptr BuildDecryptionProperties() { + // Map test key ids to fixed test keys for decrypting the file footer and columns. + std::shared_ptr kr = + std::make_shared(); + kr->PutKey(kFooterMasterKeyId, kFooterEncryptionKey); + kr->PutKey(kColumnMasterKeyIds[0], kColumnEncryptionKey1); + kr->PutKey(kColumnMasterKeyIds[1], kColumnEncryptionKey2); + + parquet::FileDecryptionProperties::Builder builder; + return builder + .key_retriever(std::static_pointer_cast(kr)) + ->build(); +} + +} // namespace + +// Read Bloom filters from an encrypted parquet-testing file. +// The test data enables Bloom filters for double_field and float_field only. +TEST(EncryptedBloomFilterReader, ReadEncryptedBloomFilter) { + const std::string file_path = + data_file("encrypt_columns_and_footer_bloom_filter.parquet.encrypted"); + + parquet::ReaderProperties reader_properties = parquet::default_reader_properties(); + reader_properties.file_decryption_properties(BuildDecryptionProperties()); + + PARQUET_ASSIGN_OR_THROW(auto source, ::arrow::io::ReadableFile::Open( + file_path, reader_properties.memory_pool())); + auto file_reader = parquet::ParquetFileReader::Open(source, reader_properties); + auto file_metadata = file_reader->metadata(); + + ASSERT_EQ(file_metadata->num_columns(), 4); + ASSERT_GE(file_metadata->num_row_groups(), 1); + + auto& bloom_filter_reader = file_reader->GetBloomFilterReader(); + auto row_group_0 = bloom_filter_reader.RowGroup(0); + ASSERT_NE(nullptr, row_group_0); + + auto double_filter = row_group_0->GetColumnBloomFilter(0); + auto float_filter = row_group_0->GetColumnBloomFilter(1); + auto int32_filter = row_group_0->GetColumnBloomFilter(2); + auto name_filter = row_group_0->GetColumnBloomFilter(3); + + // double_field and float_field have Bloom filters; the others do not. + ASSERT_NE(nullptr, double_filter); + ASSERT_NE(nullptr, float_filter); + ASSERT_EQ(nullptr, int32_filter); + ASSERT_EQ(nullptr, name_filter); + + // Values follow a simple pattern in the test data. + for (int i : {0, 1, 7, 42}) { + const double value = static_cast(i) + 0.5; + EXPECT_TRUE(double_filter->FindHash(double_filter->Hash(value))); + } + + for (int i : {0, 2, 5, 10}) { + const float value = static_cast(i) + 0.25f; + EXPECT_TRUE(float_filter->FindHash(float_filter->Hash(value))); + } +} + +} // namespace parquet::encryption::test