-
Notifications
You must be signed in to change notification settings - Fork 4k
GH-29238 [C++][Dataset][Parquet] Support parquet modular encryption in the new Dataset API #34616
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
jorisvandenbossche
merged 45 commits into
apache:main
from
tolleybot:dataset_encryption
Oct 11, 2023
Merged
Changes from all commits
Commits
Show all changes
45 commits
Select commit
Hold shift + click to select a range
3ff1617
parent ed87a5b7f5ee1081d5613532c28de8e687b8e397
tolleybot 32cdd0e
Update python/pyarrow/_dataset_parquet.pyx
tolleybot abdb585
Update python/pyarrow/_dataset_parquet.pyx
tolleybot 67ecb48
Update python/pyarrow/_dataset_parquet.pyx
tolleybot 0c719c6
Removed the Setup(..) function from ParquetEncryptionConfig, and Parq…
tolleybot 2f0aaa2
Merge branch 'dataset_encryption' of https://github.com/tolleybot/arr…
tolleybot 8440f51
Removed nogil from shared_ptr[CPyCryptoFactory] unwrap(self)
tolleybot e13f384
removed arrrow/api.h, and arrow/dataset/api.h from file_parquet_encry…
tolleybot 992c5a8
Append underscores in members of classes in src/arrow/dataset/file_pa…
tolleybot a2056a3
in file_parquet_encryption_test.cc I added some non-unique values in …
tolleybot a787728
updated parquet_encryption_config.h docstrings
tolleybot f3867cc
formatting for parquet_encrytion_config.h
tolleybot d6d3290
updated _dataset_parquet.pyx, ParquetDecryptionConfig, ParquetEncrypt…
tolleybot 98ba975
Fixed issue in ReadSingleFile Test in src/arrow/dataset/file_parquet_…
tolleybot d5ba855
removed nogil from shared_ptr[CPyCryptoFactory] unwrap(self) in parqu…
tolleybot 7d42aa3
Replace non-existent doxygen command
anjakefala 48a20f7
Run linter
anjakefala 23f0b40
Structure doxygen declarations so variables can be found
anjakefala 4a1daf0
Removing unneeded comments from file_parquet_test.cc
tolleybot 12d7660
Merge branch 'dataset_encryption' of https://github.com/tolleybot/arr…
tolleybot a70c0f9
changes to move common unwrap functions out of classes
tolleybot 77a818b
moved unwrap functions out of ParquetEncryptionConfig and ParquetDecr…
tolleybot f7d39a2
Merge branch 'master' into dataset_encryption
tolleybot 7d78847
1. Updated file_parquet.cc to change the type of exception due to the…
tolleybot 6a69ae1
formatting for _dataset_parquet.pyx
tolleybot c3fe65a
Moving docstring in parquet_encryption_config.h to correct position.
tolleybot fa9054a
Move property strings above the struct variables
anjakefala d2f1584
Merge pull request #3 from anjakefala/dataset_encryption
tolleybot 9e25c5e
Update cpp/src/arrow/dataset/parquet_encryption_config.h
tolleybot 5758119
Renaming encryption_enabled to encryption_unavailable in test_dataset…
tolleybot 36ff914
add set_encryption_config to dataset_parquet.pyx ParquetFileWriteOptions
tolleybot 0fa69e9
adding _set_encryption_config to ParquetFileWriteOptions to make sure…
tolleybot b2bb9f8
attempt to refactor cython code with separate files for with/without …
jorisvandenbossche 1462a4f
use dynamic python import
jorisvandenbossche 6655b28
update for setting encryption config as well
jorisvandenbossche 1c8aa1c
some cleanup
jorisvandenbossche 1f8ee19
update test to ensure written dataset is encrypted
jorisvandenbossche d74de45
updated test_dataset_encryption.py to validate that a dataset has enc…
tolleybot 46cc668
validating encryption is enabled for a dataset in test pyarrow/tests/…
tolleybot 6777353
Update python/pyarrow/_dataset_parquet_encryption.pyx
jorisvandenbossche 9df68d3
Merge pull request #4 from jorisvandenbossche/dataset-parquet-encrypt…
tolleybot d7a6b55
Update python/pyarrow/tests/test_dataset_encryption.py
tolleybot 22033b7
Run linter
anjakefala b323457
Update python/pyarrow/tests/test_dataset_encryption.py
jorisvandenbossche ced2ed2
Merge branch 'main' into dataset_encryption
anjakefala File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
wjones127 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,216 @@ | ||
| // Licensed to the Apache Software Foundation (ASF) under one | ||
| // or more contributor license agreements. See the NOTICE file | ||
| // distributed with this work for additional information | ||
| // regarding copyright ownership. The ASF licenses this file | ||
| // to you under the Apache License, Version 2.0 (the | ||
| // "License"); you may not use this file except in compliance | ||
| // with the License. You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, | ||
| // software distributed under the License is distributed on an | ||
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| // KIND, either express or implied. See the License for the | ||
| // specific language governing permissions and limitations | ||
| // under the License. | ||
|
|
||
| #include <string_view> | ||
|
|
||
| #include "gtest/gtest.h" | ||
|
|
||
| #include <arrow/dataset/dataset.h> | ||
| #include <arrow/dataset/file_base.h> | ||
| #include <arrow/dataset/file_parquet.h> | ||
| #include "arrow/array.h" | ||
| #include "arrow/dataset/parquet_encryption_config.h" | ||
| #include "arrow/dataset/partition.h" | ||
| #include "arrow/filesystem/mockfs.h" | ||
| #include "arrow/io/api.h" | ||
| #include "arrow/status.h" | ||
| #include "arrow/table.h" | ||
| #include "arrow/testing/gtest_util.h" | ||
| #include "arrow/type.h" | ||
| #include "parquet/arrow/reader.h" | ||
| #include "parquet/encryption/crypto_factory.h" | ||
| #include "parquet/encryption/encryption.h" | ||
| #include "parquet/encryption/kms_client.h" | ||
| #include "parquet/encryption/test_in_memory_kms.h" | ||
|
|
||
| constexpr std::string_view kFooterKeyMasterKey = "0123456789012345"; | ||
| constexpr std::string_view kFooterKeyMasterKeyId = "footer_key"; | ||
| constexpr std::string_view kFooterKeyName = "footer_key"; | ||
| constexpr std::string_view kColumnMasterKey = "1234567890123450"; | ||
| constexpr std::string_view kColumnMasterKeyId = "col_key"; | ||
| constexpr std::string_view kColumnKeyMapping = "col_key: a"; | ||
| constexpr std::string_view kBaseDir = ""; | ||
|
|
||
| using arrow::internal::checked_pointer_cast; | ||
|
|
||
| namespace arrow { | ||
| namespace dataset { | ||
|
|
||
| class DatasetEncryptionTest : public ::testing::Test { | ||
| public: | ||
| // This function creates a mock file system using the current time point, creates a | ||
| // directory with the given base directory path, and writes a dataset to it using | ||
| // provided Parquet file write options. The dataset is partitioned using a Hive | ||
| // partitioning scheme. The function also checks if the written files exist in the file | ||
| // system. | ||
| static void SetUpTestSuite() { | ||
| // Creates a mock file system using the current time point. | ||
| EXPECT_OK_AND_ASSIGN(file_system_, fs::internal::MockFileSystem::Make( | ||
| std::chrono::system_clock::now(), {})); | ||
| ASSERT_OK(file_system_->CreateDir(std::string(kBaseDir))); | ||
|
|
||
| // Prepare table data. | ||
| auto table_schema = schema({field("a", int64()), field("c", int64()), | ||
| field("e", int64()), field("part", utf8())}); | ||
| table_ = TableFromJSON(table_schema, {R"([ | ||
| [ 0, 9, 1, "a" ], | ||
| [ 1, 8, 2, "a" ], | ||
| [ 2, 7, 1, "c" ], | ||
| [ 3, 6, 2, "c" ], | ||
| [ 4, 5, 1, "e" ], | ||
| [ 5, 4, 2, "e" ], | ||
| [ 6, 3, 1, "g" ], | ||
| [ 7, 2, 2, "g" ], | ||
| [ 8, 1, 1, "i" ], | ||
| [ 9, 0, 2, "i" ] | ||
| ])"}); | ||
|
|
||
| // Use a Hive-style partitioning scheme. | ||
| partitioning_ = std::make_shared<HivePartitioning>(schema({field("part", utf8())})); | ||
|
|
||
| // Prepare encryption properties. | ||
| std::unordered_map<std::string, std::string> key_map; | ||
| key_map.emplace(kColumnMasterKeyId, kColumnMasterKey); | ||
| key_map.emplace(kFooterKeyMasterKeyId, kFooterKeyMasterKey); | ||
|
|
||
| crypto_factory_ = std::make_shared<parquet::encryption::CryptoFactory>(); | ||
| auto kms_client_factory = | ||
| std::make_shared<parquet::encryption::TestOnlyInMemoryKmsClientFactory>( | ||
| /*wrap_locally=*/true, key_map); | ||
| crypto_factory_->RegisterKmsClientFactory(std::move(kms_client_factory)); | ||
| kms_connection_config_ = std::make_shared<parquet::encryption::KmsConnectionConfig>(); | ||
|
|
||
| // Set write options with encryption configuration. | ||
| auto encryption_config = | ||
| std::make_shared<parquet::encryption::EncryptionConfiguration>( | ||
| std::string(kFooterKeyName)); | ||
| encryption_config->column_keys = kColumnKeyMapping; | ||
| auto parquet_encryption_config = std::make_shared<ParquetEncryptionConfig>(); | ||
| // Directly assign shared_ptr objects to ParquetEncryptionConfig members | ||
| parquet_encryption_config->crypto_factory = crypto_factory_; | ||
| parquet_encryption_config->kms_connection_config = kms_connection_config_; | ||
| parquet_encryption_config->encryption_config = std::move(encryption_config); | ||
|
|
||
| auto file_format = std::make_shared<ParquetFileFormat>(); | ||
| auto parquet_file_write_options = | ||
| checked_pointer_cast<ParquetFileWriteOptions>(file_format->DefaultWriteOptions()); | ||
| parquet_file_write_options->parquet_encryption_config = | ||
| std::move(parquet_encryption_config); | ||
|
|
||
| // Write dataset. | ||
| auto dataset = std::make_shared<InMemoryDataset>(table_); | ||
| EXPECT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); | ||
| EXPECT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); | ||
|
|
||
| FileSystemDatasetWriteOptions write_options; | ||
| write_options.file_write_options = parquet_file_write_options; | ||
| write_options.filesystem = file_system_; | ||
| write_options.base_dir = kBaseDir; | ||
| write_options.partitioning = partitioning_; | ||
| write_options.basename_template = "part{i}.parquet"; | ||
| ASSERT_OK(FileSystemDataset::Write(write_options, std::move(scanner))); | ||
| } | ||
|
|
||
| protected: | ||
| inline static std::shared_ptr<fs::FileSystem> file_system_; | ||
| inline static std::shared_ptr<Table> table_; | ||
| inline static std::shared_ptr<HivePartitioning> partitioning_; | ||
| inline static std::shared_ptr<parquet::encryption::CryptoFactory> crypto_factory_; | ||
| inline static std::shared_ptr<parquet::encryption::KmsConnectionConfig> | ||
| kms_connection_config_; | ||
| }; | ||
|
|
||
| // This test demonstrates the process of writing a partitioned Parquet file with the same | ||
| // encryption properties applied to each file within the dataset. The encryption | ||
| // properties are determined based on the selected columns. After writing the dataset, the | ||
| // test reads the data back and verifies that it can be successfully decrypted and | ||
| // scanned. | ||
| TEST_F(DatasetEncryptionTest, WriteReadDatasetWithEncryption) { | ||
| // Create decryption properties. | ||
| auto decryption_config = | ||
| std::make_shared<parquet::encryption::DecryptionConfiguration>(); | ||
| auto parquet_decryption_config = std::make_shared<ParquetDecryptionConfig>(); | ||
| parquet_decryption_config->crypto_factory = crypto_factory_; | ||
| parquet_decryption_config->kms_connection_config = kms_connection_config_; | ||
| parquet_decryption_config->decryption_config = std::move(decryption_config); | ||
|
|
||
| // Set scan options. | ||
| auto parquet_scan_options = std::make_shared<ParquetFragmentScanOptions>(); | ||
| parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config); | ||
|
|
||
| auto file_format = std::make_shared<ParquetFileFormat>(); | ||
| file_format->default_fragment_scan_options = std::move(parquet_scan_options); | ||
|
|
||
| // Get FileInfo objects for all files under the base directory | ||
| fs::FileSelector selector; | ||
| selector.base_dir = kBaseDir; | ||
| selector.recursive = true; | ||
|
|
||
| FileSystemFactoryOptions factory_options; | ||
| factory_options.partitioning = partitioning_; | ||
| factory_options.partition_base_dir = kBaseDir; | ||
| ASSERT_OK_AND_ASSIGN(auto dataset_factory, | ||
| FileSystemDatasetFactory::Make(file_system_, selector, file_format, | ||
| factory_options)); | ||
|
|
||
| // Read dataset into table | ||
| ASSERT_OK_AND_ASSIGN(auto dataset, dataset_factory->Finish()); | ||
| ASSERT_OK_AND_ASSIGN(auto scanner_builder, dataset->NewScan()); | ||
| ASSERT_OK_AND_ASSIGN(auto scanner, scanner_builder->Finish()); | ||
| ASSERT_OK_AND_ASSIGN(auto read_table, scanner->ToTable()); | ||
|
|
||
| // Verify the data was read correctly | ||
| ASSERT_OK_AND_ASSIGN(auto combined_table, read_table->CombineChunks()); | ||
| // Validate the table | ||
| ASSERT_OK(combined_table->ValidateFull()); | ||
| AssertTablesEqual(*combined_table, *table_); | ||
| } | ||
|
|
||
| // Read a single parquet file with and without decryption properties. | ||
| TEST_F(DatasetEncryptionTest, ReadSingleFile) { | ||
| // Open the Parquet file. | ||
| ASSERT_OK_AND_ASSIGN(auto input, file_system_->OpenInputFile("part=a/part0.parquet")); | ||
|
|
||
| // Try to read metadata without providing decryption properties | ||
| // when the footer is encrypted. | ||
| ASSERT_THROW(parquet::ReadMetaData(input), parquet::ParquetException); | ||
|
|
||
| // Create the ReaderProperties object using the FileDecryptionProperties object | ||
| auto decryption_config = | ||
| std::make_shared<parquet::encryption::DecryptionConfiguration>(); | ||
| auto file_decryption_properties = crypto_factory_->GetFileDecryptionProperties( | ||
| *kms_connection_config_, *decryption_config); | ||
| auto reader_properties = parquet::default_reader_properties(); | ||
| reader_properties.file_decryption_properties(file_decryption_properties); | ||
|
|
||
| // Read entire file as a single Arrow table | ||
| parquet::arrow::FileReaderBuilder reader_builder; | ||
| ASSERT_OK(reader_builder.Open(input, reader_properties)); | ||
| ASSERT_OK_AND_ASSIGN(auto arrow_reader, reader_builder.Build()); | ||
| std::shared_ptr<Table> table; | ||
| ASSERT_OK(arrow_reader->ReadTable(&table)); | ||
|
|
||
| // Check the contents of the table | ||
| ASSERT_EQ(table->num_rows(), 2); | ||
| ASSERT_EQ(table->num_columns(), 3); | ||
| ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(0)->chunk(0))->GetView(0), 0); | ||
| ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(1)->chunk(0))->GetView(0), 9); | ||
| ASSERT_EQ(checked_pointer_cast<Int64Array>(table->column(2)->chunk(0))->GetView(0), 1); | ||
| } | ||
|
|
||
| } // namespace dataset | ||
| } // namespace arrow |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.