diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index e1130bb2d5..cb9ff9a8d5 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use futures::{StreamExt, TryStreamExt}; use crate::arrow::ArrowReader; +use crate::arrow::reader::ParquetReadOptions; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; @@ -63,10 +64,9 @@ impl BasicDeleteFileLoader { let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder( data_file_path, self.file_io.clone(), - false, - None, None, file_size_in_bytes, + ParquetReadOptions::builder().build(), ) .await? .build()? diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 90bedae2c6..128d9703ef 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -43,6 +43,7 @@ use parquet::file::metadata::{ PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData, }; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; +use typed_builder::TypedBuilder; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; @@ -60,6 +61,78 @@ use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; +/// Default gap between byte ranges below which they are coalesced into a +/// single request. Matches object_store's `OBJECT_STORE_COALESCE_DEFAULT`. +const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024; + +/// Default maximum number of coalesced byte ranges fetched concurrently. +/// Matches object_store's `OBJECT_STORE_COALESCE_PARALLEL`. +const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 10; + +/// Default number of bytes to prefetch when parsing Parquet footer metadata. +/// Matches DataFusion's default `ParquetOptions::metadata_size_hint`. +const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024; + +/// Options for tuning Parquet file I/O. +#[derive(Clone, Copy, Debug, TypedBuilder)] +#[builder(field_defaults(setter(prefix = "with_")))] +pub(crate) struct ParquetReadOptions { + /// Number of bytes to prefetch for parsing the Parquet metadata. + /// + /// This hint can help reduce the number of fetch requests. For more details see the + /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint). + /// + /// Defaults to 512 KiB, matching DataFusion's default `ParquetOptions::metadata_size_hint`. + #[builder(default = Some(DEFAULT_METADATA_SIZE_HINT))] + pub(crate) metadata_size_hint: Option, + /// Gap threshold for merging nearby byte ranges into a single request. + /// Ranges with gaps smaller than this value will be coalesced. + /// + /// Defaults to 1 MiB, matching object_store's `OBJECT_STORE_COALESCE_DEFAULT`. + #[builder(default = DEFAULT_RANGE_COALESCE_BYTES)] + pub(crate) range_coalesce_bytes: u64, + /// Maximum number of merged byte ranges to fetch concurrently. + /// + /// Defaults to 10, matching object_store's `OBJECT_STORE_COALESCE_PARALLEL`. + #[builder(default = DEFAULT_RANGE_FETCH_CONCURRENCY)] + pub(crate) range_fetch_concurrency: usize, + /// Whether to preload the column index when reading Parquet metadata. + #[builder(default = true)] + pub(crate) preload_column_index: bool, + /// Whether to preload the offset index when reading Parquet metadata. + #[builder(default = true)] + pub(crate) preload_offset_index: bool, + /// Whether to preload the page index when reading Parquet metadata. + #[builder(default = false)] + pub(crate) preload_page_index: bool, +} + +impl ParquetReadOptions { + pub(crate) fn metadata_size_hint(&self) -> Option { + self.metadata_size_hint + } + + pub(crate) fn range_coalesce_bytes(&self) -> u64 { + self.range_coalesce_bytes + } + + pub(crate) fn range_fetch_concurrency(&self) -> usize { + self.range_fetch_concurrency + } + + pub(crate) fn preload_column_index(&self) -> bool { + self.preload_column_index + } + + pub(crate) fn preload_offset_index(&self) -> bool { + self.preload_offset_index + } + + pub(crate) fn preload_page_index(&self) -> bool { + self.preload_page_index + } +} + /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, @@ -67,7 +140,7 @@ pub struct ArrowReaderBuilder { concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, - metadata_size_hint: Option, + parquet_read_options: ParquetReadOptions, } impl ArrowReaderBuilder { @@ -81,7 +154,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, - metadata_size_hint: None, + parquet_read_options: ParquetReadOptions::builder().build(), } } @@ -115,7 +188,24 @@ impl ArrowReaderBuilder { /// This hint can help reduce the number of fetch requests. For more details see the /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint). pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self { - self.metadata_size_hint = Some(metadata_size_hint); + self.parquet_read_options.metadata_size_hint = Some(metadata_size_hint); + self + } + + /// Sets the gap threshold for merging nearby byte ranges into a single request. + /// Ranges with gaps smaller than this value will be coalesced. + /// + /// Defaults to 1 MiB, matching object_store's OBJECT_STORE_COALESCE_DEFAULT. + pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self { + self.parquet_read_options.range_coalesce_bytes = range_coalesce_bytes; + self + } + + /// Sets the maximum number of merged byte ranges to fetch concurrently. + /// + /// Defaults to 10, matching object_store's OBJECT_STORE_COALESCE_PARALLEL. + pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self { + self.parquet_read_options.range_fetch_concurrency = range_fetch_concurrency; self } @@ -131,7 +221,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, - metadata_size_hint: self.metadata_size_hint, + parquet_read_options: self.parquet_read_options, } } } @@ -148,7 +238,7 @@ pub struct ArrowReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, - metadata_size_hint: Option, + parquet_read_options: ParquetReadOptions, } impl ArrowReader { @@ -160,7 +250,7 @@ impl ArrowReader { let concurrency_limit_data_files = self.concurrency_limit_data_files; let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; - let metadata_size_hint = self.metadata_size_hint; + let parquet_read_options = self.parquet_read_options; // Fast-path for single concurrency to avoid overhead of try_flatten_unordered let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 { @@ -176,7 +266,7 @@ impl ArrowReader { self.delete_file_loader.clone(), row_group_filtering_enabled, row_selection_enabled, - metadata_size_hint, + parquet_read_options, ) }) .map_err(|err| { @@ -198,7 +288,7 @@ impl ArrowReader { self.delete_file_loader.clone(), row_group_filtering_enabled, row_selection_enabled, - metadata_size_hint, + parquet_read_options, ) }) .map_err(|err| { @@ -213,7 +303,6 @@ impl ArrowReader { Ok(stream) } - #[allow(clippy::too_many_arguments)] async fn process_file_scan_task( task: FileScanTask, batch_size: Option, @@ -221,10 +310,12 @@ impl ArrowReader { delete_file_loader: CachingDeleteFileLoader, row_group_filtering_enabled: bool, row_selection_enabled: bool, - metadata_size_hint: Option, + parquet_read_options: ParquetReadOptions, ) -> Result { let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); + let mut parquet_read_options = parquet_read_options; + parquet_read_options.preload_page_index = should_load_page_index; let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema)); @@ -234,10 +325,9 @@ impl ArrowReader { let initial_stream_builder = Self::create_parquet_record_batch_stream_builder( &task.data_file_path, file_io.clone(), - should_load_page_index, None, - metadata_size_hint, task.file_size_in_bytes, + parquet_read_options, ) .await?; @@ -288,10 +378,9 @@ impl ArrowReader { Self::create_parquet_record_batch_stream_builder( &task.data_file_path, file_io.clone(), - should_load_page_index, Some(options), - metadata_size_hint, task.file_size_in_bytes, + parquet_read_options, ) .await? } else { @@ -493,28 +582,21 @@ impl ArrowReader { pub(crate) async fn create_parquet_record_batch_stream_builder( data_file_path: &str, file_io: FileIO, - should_load_page_index: bool, arrow_reader_options: Option, - metadata_size_hint: Option, file_size_in_bytes: u64, + parquet_read_options: ParquetReadOptions, ) -> Result> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within let parquet_file = file_io.new_input(data_file_path)?; let parquet_reader = parquet_file.reader().await?; - let mut parquet_file_reader = ArrowFileReader::new( + let parquet_file_reader = ArrowFileReader::new( FileMetadata { size: file_size_in_bytes, }, parquet_reader, ) - .with_preload_column_index(true) - .with_preload_offset_index(true) - .with_preload_page_index(should_load_page_index); - - if let Some(hint) = metadata_size_hint { - parquet_file_reader = parquet_file_reader.with_metadata_size_hint(hint); - } + .with_parquet_read_options(parquet_read_options); // Create the record batch stream builder, which wraps the parquet file reader let options = arrow_reader_options.unwrap_or_default(); @@ -1706,10 +1788,7 @@ impl BoundPredicateVisitor for PredicateConverter<'_> { /// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader. pub struct ArrowFileReader { meta: FileMetadata, - preload_column_index: bool, - preload_offset_index: bool, - preload_page_index: bool, - metadata_size_hint: Option, + parquet_read_options: ParquetReadOptions, r: Box, } @@ -1718,38 +1797,14 @@ impl ArrowFileReader { pub fn new(meta: FileMetadata, r: Box) -> Self { Self { meta, - preload_column_index: false, - preload_offset_index: false, - preload_page_index: false, - metadata_size_hint: None, + parquet_read_options: ParquetReadOptions::builder().build(), r, } } - /// Enable or disable preloading of the column index - pub fn with_preload_column_index(mut self, preload: bool) -> Self { - self.preload_column_index = preload; - self - } - - /// Enable or disable preloading of the offset index - pub fn with_preload_offset_index(mut self, preload: bool) -> Self { - self.preload_offset_index = preload; - self - } - - /// Enable or disable preloading of the page index - pub fn with_preload_page_index(mut self, preload: bool) -> Self { - self.preload_page_index = preload; - self - } - - /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata - /// - /// This hint can help reduce the number of fetch requests. For more details see the - /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint). - pub fn with_metadata_size_hint(mut self, hint: usize) -> Self { - self.metadata_size_hint = Some(hint); + /// Configure all Parquet read options. + pub(crate) fn with_parquet_read_options(mut self, options: ParquetReadOptions) -> Self { + self.parquet_read_options = options; self } } @@ -1763,6 +1818,49 @@ impl AsyncFileReader for ArrowFileReader { ) } + /// Override the default `get_byte_ranges` which calls `get_bytes` sequentially. + /// The parquet reader calls this to fetch column chunks for a row group, so + /// without this override each column chunk is a serial round-trip to object storage. + /// Adapted from object_store's `coalesce_ranges` in `util.rs`. + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + let coalesce_bytes = self.parquet_read_options.range_coalesce_bytes(); + let concurrency = self.parquet_read_options.range_fetch_concurrency().max(1); + + async move { + // Merge nearby ranges to reduce the number of object store requests. + let fetch_ranges = merge_ranges(&ranges, coalesce_bytes); + let r = &self.r; + + // Fetch merged ranges concurrently. + let fetched: Vec = futures::stream::iter(fetch_ranges.iter().cloned()) + .map(|range| async move { + r.read(range) + .await + .map_err(|e| parquet::errors::ParquetError::External(Box::new(e))) + }) + .buffered(concurrency) + .try_collect() + .await?; + + // Slice the fetched data back into the originally requested ranges. + Ok(ranges + .iter() + .map(|range| { + let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; + let fetch_range = &fetch_ranges[idx]; + let fetch_bytes = &fetched[idx]; + let start = (range.start - fetch_range.start) as usize; + let end = (range.end - fetch_range.start) as usize; + fetch_bytes.slice(start..end.min(fetch_bytes.len())) + }) + .collect()) + } + .boxed() + } + // TODO: currently we don't respect `ArrowReaderOptions` cause it don't expose any method to access the option field // we will fix it after `v55.1.0` is released in https://github.com/apache/arrow-rs/issues/7393 fn get_metadata( @@ -1771,11 +1869,17 @@ impl AsyncFileReader for ArrowFileReader { ) -> BoxFuture<'_, parquet::errors::Result>> { async move { let reader = ParquetMetaDataReader::new() - .with_prefetch_hint(self.metadata_size_hint) + .with_prefetch_hint(self.parquet_read_options.metadata_size_hint()) // Set the page policy first because it updates both column and offset policies. - .with_page_index_policy(PageIndexPolicy::from(self.preload_page_index)) - .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index)) - .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index)); + .with_page_index_policy(PageIndexPolicy::from( + self.parquet_read_options.preload_page_index(), + )) + .with_column_index_policy(PageIndexPolicy::from( + self.parquet_read_options.preload_column_index(), + )) + .with_offset_index_policy(PageIndexPolicy::from( + self.parquet_read_options.preload_offset_index(), + )); let size = self.meta.size; let meta = reader.load_and_finish(self, size).await?; @@ -1785,6 +1889,42 @@ impl AsyncFileReader for ArrowFileReader { } } +/// Merge overlapping or nearby byte ranges, combining ranges with gaps <= `coalesce` bytes. +/// Adapted from object_store's `merge_ranges` in `util.rs`. +fn merge_ranges(ranges: &[Range], coalesce: u64) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut ranges = ranges.to_vec(); + ranges.sort_unstable_by_key(|r| r.start); + + let mut merged = Vec::with_capacity(ranges.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != ranges.len() { + let mut range_end = ranges[start_idx].end; + + while end_idx != ranges.len() + && ranges[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(ranges[end_idx].end); + end_idx += 1; + } + + merged.push(ranges[start_idx].start..range_end); + start_idx = end_idx; + end_idx += 1; + } + + merged +} + /// The Arrow type of an array that the Parquet reader reads may not match the exact Arrow type /// that Iceberg uses for literals - but they are effectively the same logical type, /// i.e. LargeUtf8 and Utf8 or Utf8View and Utf8 or Utf8View and LargeUtf8. @@ -1810,6 +1950,7 @@ fn try_cast_literal( mod tests { use std::collections::{HashMap, HashSet}; use std::fs::File; + use std::ops::Range; use std::sync::Arc; use arrow_array::cast::AsArray; @@ -4317,4 +4458,209 @@ message schema { assert_eq!(name_col.value(2), "Charlie"); assert_eq!(name_col.value(3), "Dave"); } + + #[test] + fn test_merge_ranges_empty() { + assert_eq!(super::merge_ranges(&[], 1024), Vec::>::new()); + } + + #[test] + fn test_merge_ranges_no_coalesce() { + // Ranges far apart should not be merged + let ranges = vec![0..100, 1_000_000..1_000_100]; + let merged = super::merge_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]); + } + + #[test] + fn test_merge_ranges_coalesce() { + // Ranges within the gap threshold should be merged + let ranges = vec![0..100, 200..300, 500..600]; + let merged = super::merge_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..600]); + } + + #[test] + fn test_merge_ranges_overlapping() { + let ranges = vec![0..200, 100..300]; + let merged = super::merge_ranges(&ranges, 0); + assert_eq!(merged, vec![0..300]); + } + + #[test] + fn test_merge_ranges_unsorted() { + let ranges = vec![500..600, 0..100, 200..300]; + let merged = super::merge_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..600]); + } + + /// Mock FileRead backed by a flat byte buffer. + struct MockFileRead { + data: bytes::Bytes, + } + + impl MockFileRead { + fn new(size: usize) -> Self { + // Fill with sequential byte values so slices are verifiable. + let data: Vec = (0..size).map(|i| (i % 256) as u8).collect(); + Self { + data: bytes::Bytes::from(data), + } + } + } + + #[async_trait::async_trait] + impl crate::io::FileRead for MockFileRead { + async fn read(&self, range: Range) -> crate::Result { + Ok(self.data.slice(range.start as usize..range.end as usize)) + } + } + + #[tokio::test] + async fn test_get_byte_ranges_no_coalesce() { + use parquet::arrow::async_reader::AsyncFileReader; + + let mock = MockFileRead::new(2048); + let expected_0 = mock.data.slice(0..100); + let expected_1 = mock.data.slice(1500..1600); + + let mut reader = + super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock)) + .with_parquet_read_options( + super::ParquetReadOptions::builder() + .with_range_coalesce_bytes(0) + .build(), + ); + + let result = reader + .get_byte_ranges(vec![0..100, 1500..1600]) + .await + .unwrap(); + + assert_eq!(result.len(), 2); + assert_eq!(result[0], expected_0); + assert_eq!(result[1], expected_1); + } + + #[tokio::test] + async fn test_get_byte_ranges_with_coalesce() { + use parquet::arrow::async_reader::AsyncFileReader; + + let mock = MockFileRead::new(1024); + let expected_0 = mock.data.slice(0..100); + let expected_1 = mock.data.slice(200..300); + let expected_2 = mock.data.slice(500..600); + + let mut reader = + super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock)) + .with_parquet_read_options( + super::ParquetReadOptions::builder() + .with_range_coalesce_bytes(1024) + .build(), + ); + + // All ranges within coalesce threshold — should merge into one fetch. + let result = reader + .get_byte_ranges(vec![0..100, 200..300, 500..600]) + .await + .unwrap(); + + assert_eq!(result.len(), 3); + assert_eq!(result[0], expected_0); + assert_eq!(result[1], expected_1); + assert_eq!(result[2], expected_2); + } + + #[tokio::test] + async fn test_get_byte_ranges_empty() { + use parquet::arrow::async_reader::AsyncFileReader; + + let mock = MockFileRead::new(1024); + let mut reader = + super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock)); + + let result = reader.get_byte_ranges(vec![]).await.unwrap(); + assert!(result.is_empty()); + } + + #[tokio::test] + async fn test_get_byte_ranges_coalesce_max() { + use parquet::arrow::async_reader::AsyncFileReader; + + let mock = MockFileRead::new(2048); + let expected_0 = mock.data.slice(0..100); + let expected_1 = mock.data.slice(1500..1600); + + let mut reader = + super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock)) + .with_parquet_read_options( + super::ParquetReadOptions::builder() + .with_range_coalesce_bytes(u64::MAX) + .build(), + ); + + // u64::MAX coalesce — all ranges merge into a single fetch. + let result = reader + .get_byte_ranges(vec![0..100, 1500..1600]) + .await + .unwrap(); + + assert_eq!(result.len(), 2); + assert_eq!(result[0], expected_0); + assert_eq!(result[1], expected_1); + } + + #[tokio::test] + async fn test_get_byte_ranges_concurrency_zero() { + use parquet::arrow::async_reader::AsyncFileReader; + + // concurrency=0 is clamped to 1, so this should not hang. + let mock = MockFileRead::new(1024); + let expected = mock.data.slice(0..100); + + let mut reader = + super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock)) + .with_parquet_read_options( + super::ParquetReadOptions::builder() + .with_range_fetch_concurrency(0) + .build(), + ); + + let result = reader + .get_byte_ranges(vec![0..100, 200..300]) + .await + .unwrap(); + assert_eq!(result.len(), 2); + assert_eq!(result[0], expected); + } + + #[tokio::test] + async fn test_get_byte_ranges_concurrency_one() { + use parquet::arrow::async_reader::AsyncFileReader; + + let mock = MockFileRead::new(2048); + let expected_0 = mock.data.slice(0..100); + let expected_1 = mock.data.slice(500..600); + let expected_2 = mock.data.slice(1500..1600); + + let mut reader = + super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock)) + .with_parquet_read_options( + super::ParquetReadOptions::builder() + .with_range_coalesce_bytes(0) + .with_range_fetch_concurrency(1) + .build(), + ); + + // concurrency=1 with no coalescing — sequential fetches. + let result = reader + .get_byte_ranges(vec![0..100, 500..600, 1500..1600]) + .await + .unwrap(); + + assert_eq!(result.len(), 3); + assert_eq!(result[0], expected_0); + assert_eq!(result[1], expected_1); + assert_eq!(result[2], expected_2); + } }