Skip to content

Commit

Permalink
Start encryption
Browse files Browse the repository at this point in the history
  • Loading branch information
rok committed Feb 11, 2025
1 parent 9ed8529 commit 877ca0c
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 4 deletions.
27 changes: 24 additions & 3 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1127,7 +1127,9 @@ mod tests {

use std::fs::File;

use crate::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
use crate::arrow::arrow_reader::{
ArrowReaderOptions, ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder,
};
use crate::arrow::ARROW_SCHEMA_META_KEY;
use arrow::datatypes::ToByteSlice;
use arrow::datatypes::{DataType, Schema};
Expand All @@ -1141,6 +1143,8 @@ mod tests {

use crate::basic::Encoding;
use crate::data_type::AsBytes;
use crate::encryption::decryption::FileDecryptionProperties;
use crate::encryption::encryption::FileEncryptionProperties;
use crate::file::metadata::ParquetMetaData;
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::read_offset_indexes;
Expand Down Expand Up @@ -3546,8 +3550,14 @@ mod tests {
let file = tempfile::tempfile().unwrap();

// todo: add encryption
let key_code: &[u8] = "0123456789012345".as_bytes();
let file_encryption_properties = FileEncryptionProperties::builder(key_code.to_vec())
.build()
.unwrap();

let props = WriterProperties::builder()
.set_max_row_group_size(200)
.with_file_encryption_properties(file_encryption_properties)
.build();

let mut writer =
Expand All @@ -3560,8 +3570,19 @@ mod tests {

writer.close().unwrap();

// todo: try_new_with_decryption
let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
let footer_key = "0123456789012345".as_bytes();
let column_key = "1234567890123450".as_bytes();

let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec())
.with_column_key("int".as_bytes().to_vec(), column_key.to_vec())
.build()
.unwrap();

let options =
ArrowReaderOptions::new().with_file_decryption_properties(decryption_properties);

// todo: remove with_file_decryption_properties from things that are not ArrowReaderOptions
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
assert_eq!(&row_group_sizes(builder.metadata()), &[200, 200, 50]);

let batches = builder
Expand Down
27 changes: 27 additions & 0 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1535,6 +1535,8 @@ mod tests {
page::PageReader,
reader::{get_column_reader, get_typed_column_reader, ColumnReaderImpl},
};
#[cfg(feature = "encryption")]
use crate::encryption::encryption::FileEncryptionProperties;
use crate::file::writer::TrackedWrite;
use crate::file::{
properties::ReaderProperties, reader::SerializedPageReader, writer::SerializedPageWriter,
Expand Down Expand Up @@ -3379,6 +3381,31 @@ mod tests {
);
}

#[cfg(feature = "encryption")]
#[test]
fn test_encryption_writer() {
let message_type = "
message test_schema {
OPTIONAL BYTE_ARRAY a (UTF8);
}
";
let schema = Arc::new(parse_message_type(message_type).unwrap());
let file: File = tempfile::tempfile().unwrap();

let builder = WriterProperties::builder();
let key_code: &[u8] = "0123456789012345".as_bytes();
let file_encryption_properties = FileEncryptionProperties::builder(key_code.to_vec())
.build()
.unwrap();

let props = Arc::new(
builder
.with_file_encryption_properties(file_encryption_properties)
.build(),
);
let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
}

#[test]
fn test_increment_max_binary_chars() {
let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);
Expand Down
80 changes: 79 additions & 1 deletion parquet/src/encryption/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
// under the License.

use crate::errors::Result;
use ring::aead::{Aad, LessSafeKey, UnboundKey, AES_128_GCM};
use ring::aead::{Aad, LessSafeKey, NonceSequence, UnboundKey, AES_128_GCM};
use ring::rand::{SecureRandom, SystemRandom};
use std::fmt::Debug;

const RIGHT_TWELVE: u128 = 0x0000_0000_ffff_ffff_ffff_ffff_ffff_ffff;
const NONCE_LEN: usize = 12;
const TAG_LEN: usize = 16;
const SIZE_LEN: usize = 4;
Expand Down Expand Up @@ -60,3 +62,79 @@ impl BlockDecryptor for RingGcmBlockDecryptor {
Ok(result)
}
}

pub trait BlockEncryptor: Debug + Send + Sync {
fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec<u8>;
}

#[derive(Debug, Clone)]
struct CounterNonce {
start: u128,
counter: u128,
}

impl CounterNonce {
pub fn new(rng: &SystemRandom) -> Self {
let mut buf = [0; 16];
rng.fill(&mut buf).unwrap();

// Since this is a random seed value, endianess doesn't matter at all,
// and we can use whatever is platform-native.
let start = u128::from_ne_bytes(buf) & RIGHT_TWELVE;
let counter = start.wrapping_add(1);

Self { start, counter }
}

/// One accessor for the nonce bytes to avoid potentially flipping endianess
#[inline]
pub fn get_bytes(&self) -> [u8; NONCE_LEN] {
self.counter.to_le_bytes()[0..NONCE_LEN].try_into().unwrap()
}
}

impl NonceSequence for CounterNonce {
fn advance(&mut self) -> Result<ring::aead::Nonce, ring::error::Unspecified> {
// If we've wrapped around, we've exhausted this nonce sequence
if (self.counter & RIGHT_TWELVE) == (self.start & RIGHT_TWELVE) {
Err(ring::error::Unspecified)
} else {
// Otherwise, just advance and return the new value
let buf: [u8; NONCE_LEN] = self.get_bytes();
self.counter = self.counter.wrapping_add(1);
Ok(ring::aead::Nonce::assume_unique_for_key(buf))
}
}
}

#[derive(Debug, Clone)]
pub(crate) struct RingGcmBlockEncryptor {
key: LessSafeKey,
nonce_sequence: CounterNonce,
}

impl RingGcmBlockEncryptor {
// todo TBD: some KMS systems produce data keys, need to be able to pass them to Encryptor.
// todo TBD: for other KMSs, we will create data keys inside arrow-rs, making sure to use SystemRandom
/// Create a new `RingGcmBlockEncryptor` with a given key and random nonce.
/// The nonce will advance appropriately with each block encryption and
/// return an error if it wraps around.
pub(crate) fn new(key_bytes: &[u8]) -> Self {
let rng = SystemRandom::new();

// todo support other key sizes
let key = UnboundKey::new(&AES_128_GCM, key_bytes.as_ref()).unwrap();
let nonce = CounterNonce::new(&rng);

Self {
key: LessSafeKey::new(key),
nonce_sequence: nonce,
}
}
}

impl BlockEncryptor for RingGcmBlockEncryptor {
fn encrypt(&mut self, plaintext: &[u8], aad: &[u8]) -> Vec<u8> {
todo!()
}
}
82 changes: 82 additions & 0 deletions parquet/src/encryption/encryption.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;
use std::sync::Arc;
use crate::encryption::ciphers::{RingGcmBlockEncryptor, BlockEncryptor};

#[derive(Debug, Clone)]
pub struct FileEncryptionProperties {
encrypt_footer: bool,
footer_key: Vec<u8>,
column_keys: Option<HashMap<Vec<u8>, Vec<u8>>>,
aad_prefix: Option<Vec<u8>>,
}

impl FileEncryptionProperties {
pub fn builder(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
EncryptionPropertiesBuilder::new(footer_key)
}
}

pub struct EncryptionPropertiesBuilder {
footer_key: Vec<u8>,
column_keys: Option<HashMap<Vec<u8>, Vec<u8>>>,
aad_prefix: Option<Vec<u8>>,
}

impl EncryptionPropertiesBuilder {
pub fn new(footer_key: Vec<u8>) -> EncryptionPropertiesBuilder {
Self {
footer_key,
column_keys: None,
aad_prefix: None,
}
}

pub fn build(self) -> crate::errors::Result<FileEncryptionProperties> {
Ok(FileEncryptionProperties {
encrypt_footer: true,
footer_key: self.footer_key,
column_keys: self.column_keys,
aad_prefix: self.aad_prefix,
})
}
}

#[derive(Clone, Debug)]
pub struct FileEncryptor {
encryption_properties: FileEncryptionProperties,
footer_encryptor: Option<Arc<dyn BlockEncryptor>>,
file_aad: Vec<u8>,
}

impl FileEncryptor {
pub(crate) fn new(
encryption_properties: FileEncryptionProperties,
aad_file_unique: Vec<u8>,
aad_prefix: Vec<u8>,
) -> Self {
let file_aad = [aad_prefix.as_slice(), aad_file_unique.as_slice()].concat();
let footer_encryptor = RingGcmBlockEncryptor::new(&encryption_properties.footer_key);
Self {
encryption_properties,
footer_encryptor: Some(Arc::new(footer_encryptor)),
file_aad,
}
}
}
1 change: 1 addition & 0 deletions parquet/src/encryption/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@
pub mod ciphers;
pub mod decryption;
pub mod encryption;
pub mod modules;
23 changes: 23 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
//! Configuration via [`WriterProperties`] and [`ReaderProperties`]
use crate::basic::{Compression, Encoding};
use crate::compression::{CodecOptions, CodecOptionsBuilder};
#[cfg(feature = "encryption")]
use crate::encryption::encryption::FileEncryptionProperties;
use crate::file::metadata::KeyValue;
use crate::format::SortingColumn;
use crate::schema::types::ColumnPath;
Expand Down Expand Up @@ -372,6 +374,11 @@ impl WriterProperties {
.and_then(|c| c.bloom_filter_properties())
.or_else(|| self.default_column_properties.bloom_filter_properties())
}

#[cfg(feature = "encryption")]
pub fn file_encryption_properties(&self) -> Option<&FileEncryptionProperties> {
self.file_encryption_properties.as_ref()
}
}

/// Builder for [`WriterProperties`] parquet writer configuration.
Expand All @@ -394,6 +401,8 @@ pub struct WriterPropertiesBuilder {
column_index_truncate_length: Option<usize>,
statistics_truncate_length: Option<usize>,
coerce_types: bool,
#[cfg(feature = "encryption")]
file_encryption_properties: Option<FileEncryptionProperties>,
}

impl WriterPropertiesBuilder {
Expand All @@ -416,6 +425,8 @@ impl WriterPropertiesBuilder {
column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH,
coerce_types: DEFAULT_COERCE_TYPES,
#[cfg(feature = "encryption")]
file_encryption_properties: None,
}
}

Expand All @@ -438,6 +449,8 @@ impl WriterPropertiesBuilder {
column_index_truncate_length: self.column_index_truncate_length,
statistics_truncate_length: self.statistics_truncate_length,
coerce_types: self.coerce_types,
#[cfg(feature = "encryption")]
file_encryption_properties: self.file_encryption_properties,
}
}

Expand Down Expand Up @@ -810,6 +823,16 @@ impl WriterPropertiesBuilder {
self.coerce_types = coerce_types;
self
}

/// Sets FileEncryptionProperties.
#[cfg(feature = "encryption")]
pub fn with_file_encryption_properties(
mut self,
file_encryption_properties: FileEncryptionProperties,
) -> Self {
self.file_encryption_properties = Some(file_encryption_properties);
self
}
}

/// Controls the level of statistics to be computed by the writer and stored in
Expand Down
5 changes: 5 additions & 0 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::column::{
writer::{get_column_writer, ColumnWriter},
};
use crate::data_type::DataType;
use crate::encryption::ciphers::RingGcmBlockEncryptor;
use crate::encryption::encryption::FileEncryptor;
use crate::errors::{ParquetError, Result};
use crate::file::properties::{BloomFilterPosition, WriterPropertiesPtr};
use crate::file::reader::ChunkReader;
Expand Down Expand Up @@ -523,6 +525,9 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
) -> Result<C>,
{
self.assert_previous_writer_closed()?;
let file_encryption_properties = self.props.file_encryption_properties();
let file_encryptor =
FileEncryptor::new(file_encryption_properties.unwrap().clone(), vec![], vec![]);
Ok(match self.next_column_desc() {
Some(column) => {
let props = self.props.clone();
Expand Down

0 comments on commit 877ca0c

Please sign in to comment.