Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ add_parquet_test(arrow-metadata-test SOURCES arrow/arrow_metadata_test.cc
if(PARQUET_REQUIRE_ENCRYPTION)
add_parquet_test(encryption-test
SOURCES
encryption/bloom_filter_encryption_test.cc
encryption/encryption_internal_test.cc
encryption/write_configurations_test.cc
encryption/read_configurations_test.cc
Expand Down
48 changes: 23 additions & 25 deletions cpp/src/parquet/arrow/fuzz_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,23 @@ Status FuzzReadPageIndex(RowGroupPageIndexReader* reader, const SchemaDescriptor
return st;
}

Status FuzzReadBloomFilter(RowGroupBloomFilterReader* reader, int column,
std::uniform_int_distribution<uint64_t>& hash_dist,
std::default_random_engine& rng) {
Status st;
BEGIN_PARQUET_CATCH_EXCEPTIONS
std::unique_ptr<BloomFilter> bloom;
bloom = reader->GetColumnBloomFilter(column);
// If the column has a bloom filter, find a bunch of random hashes
if (bloom != nullptr) {
for (int k = 0; k < 100; ++k) {
bloom->FindHash(hash_dist(rng));
}
}
END_PARQUET_CATCH_EXCEPTIONS
return st;
}

ReaderProperties MakeFuzzReaderProperties(MemoryPool* pool) {
FileDecryptionProperties::Builder builder;
builder.key_retriever(MakeKeyRetriever());
Expand Down Expand Up @@ -231,31 +248,12 @@ Status FuzzReader(const uint8_t* data, int64_t size) {
}
{
// Read and decode bloom filters
try {
auto& bloom_reader = pq_file_reader->GetBloomFilterReader();
std::uniform_int_distribution<uint64_t> hash_dist;
for (int i = 0; i < num_row_groups; ++i) {
auto bloom_rg = bloom_reader.RowGroup(i);
for (int j = 0; j < num_columns; ++j) {
std::unique_ptr<BloomFilter> bloom;
bloom = bloom_rg->GetColumnBloomFilter(j);
// If the column has a bloom filter, find a bunch of random hashes
if (bloom != nullptr) {
for (int k = 0; k < 100; ++k) {
bloom->FindHash(hash_dist(rng));
}
}
}
}
} catch (const ParquetException& exc) {
// XXX we just want to ignore encrypted bloom filters and validate the
// rest of the file; there is no better way of doing this until GH-46597
// is done.
// (also see GH-48334 for reading encrypted bloom filters)
if (std::string_view(exc.what())
.find("BloomFilter decryption is not yet supported") ==
std::string_view::npos) {
throw;
auto& bloom_reader = pq_file_reader->GetBloomFilterReader();
std::uniform_int_distribution<uint64_t> hash_dist;
for (int i = 0; i < num_row_groups; ++i) {
auto bloom_rg = bloom_reader.RowGroup(i);
for (int j = 0; j < num_columns; ++j) {
st &= FuzzReadBloomFilter(bloom_rg.get(), j, hash_dist, rng);
}
}
}
Expand Down
112 changes: 111 additions & 1 deletion cpp/src/parquet/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <cstdint>
#include <cstring>
#include <limits>
#include <memory>

#include "arrow/result.h"
Expand All @@ -26,11 +27,28 @@
#include "generated/parquet_types.h"

#include "parquet/bloom_filter.h"
#include "parquet/encryption/internal_file_decryptor.h"
#include "parquet/exception.h"
#include "parquet/thrift_internal.h"
#include "parquet/xxhasher.h"

namespace parquet {
namespace {

constexpr int32_t kCiphertextLengthSize = 4;

int64_t ParseTotalCiphertextSize(const uint8_t* data, int64_t length) {
if (length < kCiphertextLengthSize) {
throw ParquetException("Ciphertext length buffer is too small");
}
uint32_t buffer_size =
(static_cast<uint32_t>(data[3]) << 24) | (static_cast<uint32_t>(data[2]) << 16) |
(static_cast<uint32_t>(data[1]) << 8) | (static_cast<uint32_t>(data[0]));
return static_cast<int64_t>(buffer_size) + kCiphertextLengthSize;
}

} // namespace

constexpr uint32_t BlockSplitBloomFilter::SALT[kBitsSetPerBlock];

BlockSplitBloomFilter::BlockSplitBloomFilter(::arrow::MemoryPool* pool)
Expand Down Expand Up @@ -106,7 +124,99 @@ static ::arrow::Status ValidateBloomFilterHeader(

BlockSplitBloomFilter BlockSplitBloomFilter::Deserialize(
const ReaderProperties& properties, ArrowInputStream* input,
std::optional<int64_t> bloom_filter_length) {
std::optional<int64_t> bloom_filter_length, Decryptor* header_decryptor,
Decryptor* bitset_decryptor) {
if (header_decryptor != nullptr || bitset_decryptor != nullptr) {
if (header_decryptor == nullptr || bitset_decryptor == nullptr) {
throw ParquetException("Bloom filter decryptors must be both provided");
}

// Encrypted path: header and bitset are encrypted separately.
ThriftDeserializer deserializer(properties);
format::BloomFilterHeader header;

// Read the length-prefixed ciphertext for the header.
PARQUET_ASSIGN_OR_THROW(auto length_buf, input->Read(kCiphertextLengthSize));
if (ARROW_PREDICT_FALSE(length_buf->size() < kCiphertextLengthSize)) {
throw ParquetException("Bloom filter header read failed: not enough data");
}

const int64_t header_cipher_total_len =
ParseTotalCiphertextSize(length_buf->data(), length_buf->size());
if (ARROW_PREDICT_FALSE(header_cipher_total_len >
std::numeric_limits<int32_t>::max())) {
throw ParquetException("Bloom filter header ciphertext length overflows int32");
}
if (bloom_filter_length && header_cipher_total_len > *bloom_filter_length) {
throw ParquetException(
"Bloom filter length less than encrypted bloom filter header length");
}
// Read the full header ciphertext and decrypt the Thrift header.
auto header_cipher_buf =
AllocateBuffer(properties.memory_pool(), header_cipher_total_len);
std::memcpy(header_cipher_buf->mutable_data(), length_buf->data(),
kCiphertextLengthSize);
const int64_t header_cipher_remaining =
header_cipher_total_len - kCiphertextLengthSize;
PARQUET_ASSIGN_OR_THROW(
auto read_size,
input->Read(header_cipher_remaining,
header_cipher_buf->mutable_data() + kCiphertextLengthSize));
if (ARROW_PREDICT_FALSE(read_size < header_cipher_remaining)) {
throw ParquetException("Bloom filter header read failed: not enough data");
}

uint32_t header_cipher_len = static_cast<uint32_t>(header_cipher_total_len);
try {
deserializer.DeserializeMessage(
reinterpret_cast<const uint8_t*>(header_cipher_buf->data()), &header_cipher_len,
&header, header_decryptor);
DCHECK_EQ(header_cipher_len, header_cipher_total_len);
} catch (std::exception& e) {
std::stringstream ss;
ss << "Deserializing bloom filter header failed.\n" << e.what();
throw ParquetException(ss.str());
}
PARQUET_THROW_NOT_OK(ValidateBloomFilterHeader(header));

const int32_t bloom_filter_size = header.numBytes;
const int32_t bitset_cipher_len =
bitset_decryptor->CiphertextLength(bloom_filter_size);
const int64_t total_cipher_len =
header_cipher_total_len + static_cast<int64_t>(bitset_cipher_len);
if (bloom_filter_length && *bloom_filter_length != total_cipher_len) {
std::stringstream ss;
ss << "Bloom filter length (" << bloom_filter_length.value()
<< ") does not match the actual bloom filter (size: " << total_cipher_len
<< ").";
throw ParquetException(ss.str());
}

// Read and decrypt the bitset bytes.
PARQUET_ASSIGN_OR_THROW(auto bitset_cipher_buf, input->Read(bitset_cipher_len));
if (ARROW_PREDICT_FALSE(bitset_cipher_buf->size() < bitset_cipher_len)) {
throw ParquetException("Bloom Filter read failed: not enough data");
}

const int32_t bitset_plain_len =
bitset_decryptor->PlaintextLength(static_cast<int32_t>(bitset_cipher_len));
if (ARROW_PREDICT_FALSE(bitset_plain_len != bloom_filter_size)) {
throw ParquetException("Bloom filter bitset size does not match header");
}

auto bitset_plain_buf = AllocateBuffer(properties.memory_pool(), bitset_plain_len);
int32_t decrypted_len =
bitset_decryptor->Decrypt(bitset_cipher_buf->span_as<const uint8_t>(),
bitset_plain_buf->mutable_span_as<uint8_t>());
if (ARROW_PREDICT_FALSE(decrypted_len != bitset_plain_len)) {
throw ParquetException("Bloom filter bitset decryption failed");
}

// Initialize the bloom filter from the decrypted bitset.
BlockSplitBloomFilter bloom_filter(properties.memory_pool());
bloom_filter.Init(bitset_plain_buf->data(), bloom_filter_size);
return bloom_filter;
}
ThriftDeserializer deserializer(properties);
format::BloomFilterHeader header;
int64_t bloom_filter_header_read_size = 0;
Expand Down
9 changes: 8 additions & 1 deletion cpp/src/parquet/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

namespace parquet {

class Decryptor;

// A Bloom filter is a compact structure to indicate whether an item is not in a set or
// probably in a set. The Bloom filter usually consists of a bit set that represents a
// set of elements, a hash strategy and a Bloom filter algorithm.
Expand Down Expand Up @@ -323,10 +325,15 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {
/// @param input_stream The input stream from which to construct the bloom filter.
/// @param bloom_filter_length The length of the serialized bloom filter including
/// header.
/// @param header_decryptor Optional decryptor for the serialized header.
/// @param bitset_decryptor Optional decryptor for the bitset bytes.
/// Both decryptors must be provided for encrypted bloom filters, or both be null
/// for unencrypted. Providing only one will throw ParquetException.
/// @return The BlockSplitBloomFilter.
static BlockSplitBloomFilter Deserialize(
const ReaderProperties& properties, ArrowInputStream* input_stream,
std::optional<int64_t> bloom_filter_length = std::nullopt);
std::optional<int64_t> bloom_filter_length = std::nullopt,
Decryptor* header_decryptor = NULLPTR, Decryptor* bitset_decryptor = NULLPTR);

private:
inline void InsertHashImpl(uint64_t hash);
Expand Down
57 changes: 41 additions & 16 deletions cpp/src/parquet/bloom_filter_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "parquet/bloom_filter_reader.h"
#include "parquet/bloom_filter.h"
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/internal_file_decryptor.h"
#include "parquet/exception.h"
#include "parquet/metadata.h"

Expand All @@ -26,10 +28,14 @@ class RowGroupBloomFilterReaderImpl final : public RowGroupBloomFilterReader {
public:
RowGroupBloomFilterReaderImpl(std::shared_ptr<::arrow::io::RandomAccessFile> input,
std::shared_ptr<RowGroupMetaData> row_group_metadata,
const ReaderProperties& properties)
const ReaderProperties& properties,
int32_t row_group_ordinal,
std::shared_ptr<InternalFileDecryptor> file_decryptor)
: input_(std::move(input)),
row_group_metadata_(std::move(row_group_metadata)),
properties_(properties) {}
properties_(properties),
row_group_ordinal_(row_group_ordinal),
file_decryptor_(std::move(file_decryptor)) {}

std::unique_ptr<BloomFilter> GetColumnBloomFilter(int i) override;

Expand All @@ -42,6 +48,12 @@ class RowGroupBloomFilterReaderImpl final : public RowGroupBloomFilterReader {

/// Reader properties used to deserialize thrift object.
const ReaderProperties& properties_;

/// The ordinal of the row group in the file.
int32_t row_group_ordinal_;

/// File-level decryptor.
std::shared_ptr<InternalFileDecryptor> file_decryptor_;
};

std::unique_ptr<BloomFilter> RowGroupBloomFilterReaderImpl::GetColumnBloomFilter(int i) {
Expand All @@ -50,11 +62,6 @@ std::unique_ptr<BloomFilter> RowGroupBloomFilterReaderImpl::GetColumnBloomFilter
}

auto col_chunk = row_group_metadata_->ColumnChunk(i);
std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = col_chunk->crypto_metadata();
if (crypto_metadata != nullptr) {
ParquetException::NYI("BloomFilter decryption is not yet supported");
}

auto bloom_filter_offset = col_chunk->bloom_filter_offset();
if (!bloom_filter_offset.has_value()) {
return nullptr;
Expand All @@ -76,10 +83,28 @@ std::unique_ptr<BloomFilter> RowGroupBloomFilterReaderImpl::GetColumnBloomFilter
"bloom filter length + bloom filter offset greater than file size");
}
}
auto stream = ::arrow::io::RandomAccessFile::GetStream(
input_, *bloom_filter_offset, file_size - *bloom_filter_offset);
std::unique_ptr<ColumnCryptoMetaData> crypto_metadata = col_chunk->crypto_metadata();
std::unique_ptr<Decryptor> header_decryptor =
InternalFileDecryptor::GetColumnMetaDecryptorFactory(file_decryptor_.get(),
crypto_metadata.get())();
std::unique_ptr<Decryptor> bitset_decryptor =
InternalFileDecryptor::GetColumnDataDecryptorFactory(file_decryptor_.get(),
crypto_metadata.get())();
if (header_decryptor != nullptr) {
UpdateDecryptor(header_decryptor.get(), row_group_ordinal_, static_cast<int16_t>(i),
encryption::kBloomFilterHeader);
}
if (bitset_decryptor != nullptr) {
UpdateDecryptor(bitset_decryptor.get(), row_group_ordinal_, static_cast<int16_t>(i),
encryption::kBloomFilterBitset);
}
const int64_t stream_length =
bloom_filter_length ? *bloom_filter_length : file_size - *bloom_filter_offset;
auto stream = ::arrow::io::RandomAccessFile::GetStream(input_, *bloom_filter_offset,
stream_length);
auto bloom_filter =
BlockSplitBloomFilter::Deserialize(properties_, stream->get(), bloom_filter_length);
BlockSplitBloomFilter::Deserialize(properties_, stream->get(), bloom_filter_length,
header_decryptor.get(), bitset_decryptor.get());
return std::make_unique<BlockSplitBloomFilter>(std::move(bloom_filter));
}

Expand All @@ -91,11 +116,8 @@ class BloomFilterReaderImpl final : public BloomFilterReader {
std::shared_ptr<InternalFileDecryptor> file_decryptor)
: input_(std::move(input)),
file_metadata_(std::move(file_metadata)),
properties_(properties) {
if (file_decryptor != nullptr) {
ParquetException::NYI("BloomFilter decryption is not yet supported");
}
}
properties_(properties),
file_decryptor_(std::move(file_decryptor)) {}

std::shared_ptr<RowGroupBloomFilterReader> RowGroup(int i) {
if (i < 0 || i >= file_metadata_->num_row_groups()) {
Expand All @@ -104,7 +126,7 @@ class BloomFilterReaderImpl final : public BloomFilterReader {

auto row_group_metadata = file_metadata_->RowGroup(i);
return std::make_shared<RowGroupBloomFilterReaderImpl>(
input_, std::move(row_group_metadata), properties_);
input_, std::move(row_group_metadata), properties_, i, file_decryptor_);
}

private:
Expand All @@ -116,6 +138,9 @@ class BloomFilterReaderImpl final : public BloomFilterReader {

/// Reader properties used to deserialize thrift object.
const ReaderProperties& properties_;

/// File-level decryptor, if any.
std::shared_ptr<InternalFileDecryptor> file_decryptor_;
};

std::unique_ptr<BloomFilterReader> BloomFilterReader::Make(
Expand Down
Loading
Loading