From e0db5191426e1dc07835be6597daa114a72b42e4 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 26 Feb 2026 07:49:14 -0500 Subject: [PATCH 1/6] Add get_byte_ranges and merge_ranges. --- crates/iceberg/src/arrow/reader.rs | 117 +++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 93dbdaa35d..9859141c71 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -1763,6 +1763,51 @@ impl AsyncFileReader for ArrowFileReader { ) } + /// Override the default `get_byte_ranges` which calls `get_bytes` sequentially. + /// The parquet reader calls this to fetch column chunks for a row group, so + /// without this override each column chunk is a serial round-trip to object storage. + /// Adapted from object_store's `coalesce_ranges` in `util.rs`. + fn get_byte_ranges( + &mut self, + ranges: Vec>, + ) -> BoxFuture<'_, parquet::errors::Result>> { + // Values match object_store's OBJECT_STORE_COALESCE_DEFAULT and + // OBJECT_STORE_COALESCE_PARALLEL. + const COALESCE_BYTES: u64 = 1024 * 1024; + const PARALLEL: usize = 10; + + async move { + // Merge nearby ranges to reduce the number of object store requests. + let fetch_ranges = merge_ranges(&ranges, COALESCE_BYTES); + let r = &self.r; + + // Fetch merged ranges concurrently. + let fetched: Vec = futures::stream::iter(fetch_ranges.iter().cloned()) + .map(|range| async move { + r.read(range) + .await + .map_err(|e| parquet::errors::ParquetError::External(Box::new(e))) + }) + .buffered(PARALLEL) + .try_collect() + .await?; + + // Slice the fetched data back into the originally requested ranges. + Ok(ranges + .iter() + .map(|range| { + let idx = fetch_ranges.partition_point(|v| v.start <= range.start) - 1; + let fetch_range = &fetch_ranges[idx]; + let fetch_bytes = &fetched[idx]; + let start = (range.start - fetch_range.start) as usize; + let end = (range.end - fetch_range.start) as usize; + fetch_bytes.slice(start..end.min(fetch_bytes.len())) + }) + .collect()) + } + .boxed() + } + // TODO: currently we don't respect `ArrowReaderOptions` cause it don't expose any method to access the option field // we will fix it after `v55.1.0` is released in https://github.com/apache/arrow-rs/issues/7393 fn get_metadata( @@ -1785,6 +1830,42 @@ impl AsyncFileReader for ArrowFileReader { } } +/// Merge overlapping or nearby byte ranges, combining ranges with gaps <= `coalesce` bytes. +/// Adapted from object_store's `merge_ranges` in `util.rs`. +fn merge_ranges(ranges: &[Range], coalesce: u64) -> Vec> { + if ranges.is_empty() { + return vec![]; + } + + let mut ranges = ranges.to_vec(); + ranges.sort_unstable_by_key(|r| r.start); + + let mut merged = Vec::with_capacity(ranges.len()); + let mut start_idx = 0; + let mut end_idx = 1; + + while start_idx != ranges.len() { + let mut range_end = ranges[start_idx].end; + + while end_idx != ranges.len() + && ranges[end_idx] + .start + .checked_sub(range_end) + .map(|delta| delta <= coalesce) + .unwrap_or(true) + { + range_end = range_end.max(ranges[end_idx].end); + end_idx += 1; + } + + merged.push(ranges[start_idx].start..range_end); + start_idx = end_idx; + end_idx += 1; + } + + merged +} + /// The Arrow type of an array that the Parquet reader reads may not match the exact Arrow type /// that Iceberg uses for literals - but they are effectively the same logical type, /// i.e. LargeUtf8 and Utf8 or Utf8View and Utf8 or Utf8View and LargeUtf8. @@ -1810,6 +1891,7 @@ fn try_cast_literal( mod tests { use std::collections::{HashMap, HashSet}; use std::fs::File; + use std::ops::Range; use std::sync::Arc; use arrow_array::cast::AsArray; @@ -4317,4 +4399,39 @@ message schema { assert_eq!(name_col.value(2), "Charlie"); assert_eq!(name_col.value(3), "Dave"); } + + #[test] + fn test_merge_ranges_empty() { + assert_eq!(super::merge_ranges(&[], 1024), Vec::>::new()); + } + + #[test] + fn test_merge_ranges_no_coalesce() { + // Ranges far apart should not be merged + let ranges = vec![0..100, 1_000_000..1_000_100]; + let merged = super::merge_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..100, 1_000_000..1_000_100]); + } + + #[test] + fn test_merge_ranges_coalesce() { + // Ranges within the gap threshold should be merged + let ranges = vec![0..100, 200..300, 500..600]; + let merged = super::merge_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..600]); + } + + #[test] + fn test_merge_ranges_overlapping() { + let ranges = vec![0..200, 100..300]; + let merged = super::merge_ranges(&ranges, 0); + assert_eq!(merged, vec![0..300]); + } + + #[test] + fn test_merge_ranges_unsorted() { + let ranges = vec![500..600, 0..100, 200..300]; + let merged = super::merge_ranges(&ranges, 1024); + assert_eq!(merged, vec![0..600]); + } } From 6e3ac0c48ff164774d240f779910a58a3ecfb881 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 26 Feb 2026 16:39:13 -0500 Subject: [PATCH 2/6] Add configs to builder. --- .../iceberg/src/arrow/delete_file_loader.rs | 3 + crates/iceberg/src/arrow/reader.rs | 76 +++++++++++++++++-- 2 files changed, 72 insertions(+), 7 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 33744d876f..554b42521a 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -20,6 +20,7 @@ use std::sync::Arc; use futures::{StreamExt, TryStreamExt}; use crate::arrow::ArrowReader; +use crate::arrow::reader::{DEFAULT_RANGE_COALESCE_BYTES, DEFAULT_RANGE_FETCH_CONCURRENCY}; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; @@ -67,6 +68,8 @@ impl BasicDeleteFileLoader { None, None, file_size_in_bytes, + DEFAULT_RANGE_COALESCE_BYTES, + DEFAULT_RANGE_FETCH_CONCURRENCY, ) .await? .build()? diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 9859141c71..c72cd28922 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -60,6 +60,13 @@ use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; +/// Default coalesce threshold for merging nearby byte ranges. +/// Matches object_store's OBJECT_STORE_COALESCE_DEFAULT. +pub(crate) const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024; +/// Default number of merged byte ranges to fetch concurrently. +/// Matches object_store's OBJECT_STORE_COALESCE_PARALLEL. +pub(crate) const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 10; + /// Builder to create ArrowReader pub struct ArrowReaderBuilder { batch_size: Option, @@ -68,6 +75,8 @@ pub struct ArrowReaderBuilder { row_group_filtering_enabled: bool, row_selection_enabled: bool, metadata_size_hint: Option, + range_coalesce_bytes: u64, + range_fetch_concurrency: usize, } impl ArrowReaderBuilder { @@ -82,6 +91,8 @@ impl ArrowReaderBuilder { row_group_filtering_enabled: true, row_selection_enabled: false, metadata_size_hint: None, + range_coalesce_bytes: DEFAULT_RANGE_COALESCE_BYTES, + range_fetch_concurrency: DEFAULT_RANGE_FETCH_CONCURRENCY, } } @@ -119,6 +130,23 @@ impl ArrowReaderBuilder { self } + /// Sets the gap threshold for merging nearby byte ranges into a single request. + /// Ranges with gaps smaller than this value will be coalesced. + /// + /// Defaults to 1 MiB, matching object_store's OBJECT_STORE_COALESCE_DEFAULT. + pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self { + self.range_coalesce_bytes = range_coalesce_bytes; + self + } + + /// Sets the maximum number of merged byte ranges to fetch concurrently. + /// + /// Defaults to 10, matching object_store's OBJECT_STORE_COALESCE_PARALLEL. + pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self { + self.range_fetch_concurrency = range_fetch_concurrency; + self + } + /// Build the ArrowReader. pub fn build(self) -> ArrowReader { ArrowReader { @@ -132,6 +160,8 @@ impl ArrowReaderBuilder { row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, metadata_size_hint: self.metadata_size_hint, + range_coalesce_bytes: self.range_coalesce_bytes, + range_fetch_concurrency: self.range_fetch_concurrency, } } } @@ -149,6 +179,8 @@ pub struct ArrowReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, metadata_size_hint: Option, + range_coalesce_bytes: u64, + range_fetch_concurrency: usize, } impl ArrowReader { @@ -161,6 +193,8 @@ impl ArrowReader { let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; let metadata_size_hint = self.metadata_size_hint; + let range_coalesce_bytes = self.range_coalesce_bytes; + let range_fetch_concurrency = self.range_fetch_concurrency; // Fast-path for single concurrency to avoid overhead of try_flatten_unordered let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 { @@ -177,6 +211,8 @@ impl ArrowReader { row_group_filtering_enabled, row_selection_enabled, metadata_size_hint, + range_coalesce_bytes, + range_fetch_concurrency, ) }) .map_err(|err| { @@ -199,6 +235,8 @@ impl ArrowReader { row_group_filtering_enabled, row_selection_enabled, metadata_size_hint, + range_coalesce_bytes, + range_fetch_concurrency, ) }) .map_err(|err| { @@ -222,6 +260,8 @@ impl ArrowReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, metadata_size_hint: Option, + range_coalesce_bytes: u64, + range_fetch_concurrency: usize, ) -> Result { let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); @@ -238,6 +278,8 @@ impl ArrowReader { None, metadata_size_hint, task.file_size_in_bytes, + range_coalesce_bytes, + range_fetch_concurrency, ) .await?; @@ -292,6 +334,8 @@ impl ArrowReader { Some(options), metadata_size_hint, task.file_size_in_bytes, + range_coalesce_bytes, + range_fetch_concurrency, ) .await? } else { @@ -497,6 +541,8 @@ impl ArrowReader { arrow_reader_options: Option, metadata_size_hint: Option, file_size_in_bytes: u64, + range_coalesce_bytes: u64, + range_fetch_concurrency: usize, ) -> Result> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within @@ -510,7 +556,9 @@ impl ArrowReader { ) .with_preload_column_index(true) .with_preload_offset_index(true) - .with_preload_page_index(should_load_page_index); + .with_preload_page_index(should_load_page_index) + .with_range_coalesce_bytes(range_coalesce_bytes) + .with_range_fetch_concurrency(range_fetch_concurrency); if let Some(hint) = metadata_size_hint { parquet_file_reader = parquet_file_reader.with_metadata_size_hint(hint); @@ -1710,6 +1758,8 @@ pub struct ArrowFileReader { preload_offset_index: bool, preload_page_index: bool, metadata_size_hint: Option, + range_coalesce_bytes: u64, + range_fetch_concurrency: usize, r: Box, } @@ -1722,6 +1772,8 @@ impl ArrowFileReader { preload_offset_index: false, preload_page_index: false, metadata_size_hint: None, + range_coalesce_bytes: DEFAULT_RANGE_COALESCE_BYTES, + range_fetch_concurrency: DEFAULT_RANGE_FETCH_CONCURRENCY, r, } } @@ -1752,6 +1804,18 @@ impl ArrowFileReader { self.metadata_size_hint = Some(hint); self } + + /// Sets the gap threshold for merging nearby byte ranges into a single request. + pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self { + self.range_coalesce_bytes = range_coalesce_bytes; + self + } + + /// Sets the maximum number of merged byte ranges to fetch concurrently. + pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self { + self.range_fetch_concurrency = range_fetch_concurrency; + self + } } impl AsyncFileReader for ArrowFileReader { @@ -1771,14 +1835,12 @@ impl AsyncFileReader for ArrowFileReader { &mut self, ranges: Vec>, ) -> BoxFuture<'_, parquet::errors::Result>> { - // Values match object_store's OBJECT_STORE_COALESCE_DEFAULT and - // OBJECT_STORE_COALESCE_PARALLEL. - const COALESCE_BYTES: u64 = 1024 * 1024; - const PARALLEL: usize = 10; + let coalesce_bytes = self.range_coalesce_bytes; + let concurrency = self.range_fetch_concurrency; async move { // Merge nearby ranges to reduce the number of object store requests. - let fetch_ranges = merge_ranges(&ranges, COALESCE_BYTES); + let fetch_ranges = merge_ranges(&ranges, coalesce_bytes); let r = &self.r; // Fetch merged ranges concurrently. @@ -1788,7 +1850,7 @@ impl AsyncFileReader for ArrowFileReader { .await .map_err(|e| parquet::errors::ParquetError::External(Box::new(e))) }) - .buffered(PARALLEL) + .buffered(concurrency) .try_collect() .await?; From f44762d12f237cafa19bd5538c1444fffc63db4d Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 26 Feb 2026 18:18:18 -0500 Subject: [PATCH 3/6] fix clippy --- .../iceberg/src/arrow/delete_file_loader.rs | 6 +- crates/iceberg/src/arrow/reader.rs | 89 +++++++++---------- 2 files changed, 43 insertions(+), 52 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 554b42521a..cfdb87d2ca 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use futures::{StreamExt, TryStreamExt}; use crate::arrow::ArrowReader; -use crate::arrow::reader::{DEFAULT_RANGE_COALESCE_BYTES, DEFAULT_RANGE_FETCH_CONCURRENCY}; +use crate::arrow::reader::ParquetReadOptions; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::io::FileIO; use crate::scan::{ArrowRecordBatchStream, FileScanTaskDeleteFile}; @@ -66,10 +66,8 @@ impl BasicDeleteFileLoader { self.file_io.clone(), false, None, - None, file_size_in_bytes, - DEFAULT_RANGE_COALESCE_BYTES, - DEFAULT_RANGE_FETCH_CONCURRENCY, + ParquetReadOptions::default(), ) .await? .build()? diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index c72cd28922..fc11574198 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -60,12 +60,27 @@ use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; -/// Default coalesce threshold for merging nearby byte ranges. -/// Matches object_store's OBJECT_STORE_COALESCE_DEFAULT. -pub(crate) const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024; -/// Default number of merged byte ranges to fetch concurrently. -/// Matches object_store's OBJECT_STORE_COALESCE_PARALLEL. -pub(crate) const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 10; +/// Options for tuning Parquet file I/O. +#[derive(Clone, Copy, Debug)] +pub(crate) struct ParquetReadOptions { + pub metadata_size_hint: Option, + /// Gap threshold for merging nearby byte ranges into a single request. + pub range_coalesce_bytes: u64, + /// Maximum number of merged byte ranges to fetch concurrently. + pub range_fetch_concurrency: usize, +} + +impl Default for ParquetReadOptions { + /// Defaults match object_store's OBJECT_STORE_COALESCE_DEFAULT and + /// OBJECT_STORE_COALESCE_PARALLEL. + fn default() -> Self { + Self { + metadata_size_hint: None, + range_coalesce_bytes: 1024 * 1024, + range_fetch_concurrency: 10, + } + } +} /// Builder to create ArrowReader pub struct ArrowReaderBuilder { @@ -74,9 +89,7 @@ pub struct ArrowReaderBuilder { concurrency_limit_data_files: usize, row_group_filtering_enabled: bool, row_selection_enabled: bool, - metadata_size_hint: Option, - range_coalesce_bytes: u64, - range_fetch_concurrency: usize, + parquet_read_options: ParquetReadOptions, } impl ArrowReaderBuilder { @@ -90,9 +103,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, - metadata_size_hint: None, - range_coalesce_bytes: DEFAULT_RANGE_COALESCE_BYTES, - range_fetch_concurrency: DEFAULT_RANGE_FETCH_CONCURRENCY, + parquet_read_options: ParquetReadOptions::default(), } } @@ -126,7 +137,7 @@ impl ArrowReaderBuilder { /// This hint can help reduce the number of fetch requests. For more details see the /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint). pub fn with_metadata_size_hint(mut self, metadata_size_hint: usize) -> Self { - self.metadata_size_hint = Some(metadata_size_hint); + self.parquet_read_options.metadata_size_hint = Some(metadata_size_hint); self } @@ -135,7 +146,7 @@ impl ArrowReaderBuilder { /// /// Defaults to 1 MiB, matching object_store's OBJECT_STORE_COALESCE_DEFAULT. pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self { - self.range_coalesce_bytes = range_coalesce_bytes; + self.parquet_read_options.range_coalesce_bytes = range_coalesce_bytes; self } @@ -143,7 +154,7 @@ impl ArrowReaderBuilder { /// /// Defaults to 10, matching object_store's OBJECT_STORE_COALESCE_PARALLEL. pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self { - self.range_fetch_concurrency = range_fetch_concurrency; + self.parquet_read_options.range_fetch_concurrency = range_fetch_concurrency; self } @@ -159,9 +170,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: self.concurrency_limit_data_files, row_group_filtering_enabled: self.row_group_filtering_enabled, row_selection_enabled: self.row_selection_enabled, - metadata_size_hint: self.metadata_size_hint, - range_coalesce_bytes: self.range_coalesce_bytes, - range_fetch_concurrency: self.range_fetch_concurrency, + parquet_read_options: self.parquet_read_options, } } } @@ -178,9 +187,7 @@ pub struct ArrowReader { row_group_filtering_enabled: bool, row_selection_enabled: bool, - metadata_size_hint: Option, - range_coalesce_bytes: u64, - range_fetch_concurrency: usize, + parquet_read_options: ParquetReadOptions, } impl ArrowReader { @@ -192,9 +199,7 @@ impl ArrowReader { let concurrency_limit_data_files = self.concurrency_limit_data_files; let row_group_filtering_enabled = self.row_group_filtering_enabled; let row_selection_enabled = self.row_selection_enabled; - let metadata_size_hint = self.metadata_size_hint; - let range_coalesce_bytes = self.range_coalesce_bytes; - let range_fetch_concurrency = self.range_fetch_concurrency; + let parquet_read_options = self.parquet_read_options; // Fast-path for single concurrency to avoid overhead of try_flatten_unordered let stream: ArrowRecordBatchStream = if concurrency_limit_data_files == 1 { @@ -210,9 +215,7 @@ impl ArrowReader { self.delete_file_loader.clone(), row_group_filtering_enabled, row_selection_enabled, - metadata_size_hint, - range_coalesce_bytes, - range_fetch_concurrency, + parquet_read_options, ) }) .map_err(|err| { @@ -234,9 +237,7 @@ impl ArrowReader { self.delete_file_loader.clone(), row_group_filtering_enabled, row_selection_enabled, - metadata_size_hint, - range_coalesce_bytes, - range_fetch_concurrency, + parquet_read_options, ) }) .map_err(|err| { @@ -251,7 +252,6 @@ impl ArrowReader { Ok(stream) } - #[allow(clippy::too_many_arguments)] async fn process_file_scan_task( task: FileScanTask, batch_size: Option, @@ -259,9 +259,7 @@ impl ArrowReader { delete_file_loader: CachingDeleteFileLoader, row_group_filtering_enabled: bool, row_selection_enabled: bool, - metadata_size_hint: Option, - range_coalesce_bytes: u64, - range_fetch_concurrency: usize, + parquet_read_options: ParquetReadOptions, ) -> Result { let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); @@ -276,10 +274,8 @@ impl ArrowReader { file_io.clone(), should_load_page_index, None, - metadata_size_hint, task.file_size_in_bytes, - range_coalesce_bytes, - range_fetch_concurrency, + parquet_read_options, ) .await?; @@ -332,10 +328,8 @@ impl ArrowReader { file_io.clone(), should_load_page_index, Some(options), - metadata_size_hint, task.file_size_in_bytes, - range_coalesce_bytes, - range_fetch_concurrency, + parquet_read_options, ) .await? } else { @@ -539,10 +533,8 @@ impl ArrowReader { file_io: FileIO, should_load_page_index: bool, arrow_reader_options: Option, - metadata_size_hint: Option, file_size_in_bytes: u64, - range_coalesce_bytes: u64, - range_fetch_concurrency: usize, + parquet_read_options: ParquetReadOptions, ) -> Result> { // Get the metadata for the Parquet file we need to read and build // a reader for the data within @@ -557,10 +549,10 @@ impl ArrowReader { .with_preload_column_index(true) .with_preload_offset_index(true) .with_preload_page_index(should_load_page_index) - .with_range_coalesce_bytes(range_coalesce_bytes) - .with_range_fetch_concurrency(range_fetch_concurrency); + .with_range_coalesce_bytes(parquet_read_options.range_coalesce_bytes) + .with_range_fetch_concurrency(parquet_read_options.range_fetch_concurrency); - if let Some(hint) = metadata_size_hint { + if let Some(hint) = parquet_read_options.metadata_size_hint { parquet_file_reader = parquet_file_reader.with_metadata_size_hint(hint); } @@ -1766,14 +1758,15 @@ pub struct ArrowFileReader { impl ArrowFileReader { /// Create a new ArrowFileReader pub fn new(meta: FileMetadata, r: Box) -> Self { + let defaults = ParquetReadOptions::default(); Self { meta, preload_column_index: false, preload_offset_index: false, preload_page_index: false, metadata_size_hint: None, - range_coalesce_bytes: DEFAULT_RANGE_COALESCE_BYTES, - range_fetch_concurrency: DEFAULT_RANGE_FETCH_CONCURRENCY, + range_coalesce_bytes: defaults.range_coalesce_bytes, + range_fetch_concurrency: defaults.range_fetch_concurrency, r, } } From 474b03854ea112637102a4a451ca5fcc47315f8b Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 3 Mar 2026 08:20:40 -0500 Subject: [PATCH 4/6] Address PR feedback. --- crates/iceberg/src/arrow/reader.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index fc11574198..9bbac32819 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -60,6 +60,14 @@ use crate::spec::{Datum, NameMapping, NestedField, PrimitiveType, Schema, Type}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; +/// Default gap between byte ranges below which they are coalesced into a +/// single request. Matches object_store's `OBJECT_STORE_COALESCE_DEFAULT`. +const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024; + +/// Default maximum number of coalesced byte ranges fetched concurrently. +/// Matches object_store's `OBJECT_STORE_COALESCE_PARALLEL`. +const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 10; + /// Options for tuning Parquet file I/O. #[derive(Clone, Copy, Debug)] pub(crate) struct ParquetReadOptions { @@ -71,13 +79,11 @@ pub(crate) struct ParquetReadOptions { } impl Default for ParquetReadOptions { - /// Defaults match object_store's OBJECT_STORE_COALESCE_DEFAULT and - /// OBJECT_STORE_COALESCE_PARALLEL. fn default() -> Self { Self { metadata_size_hint: None, - range_coalesce_bytes: 1024 * 1024, - range_fetch_concurrency: 10, + range_coalesce_bytes: DEFAULT_RANGE_COALESCE_BYTES, + range_fetch_concurrency: DEFAULT_RANGE_FETCH_CONCURRENCY, } } } From 59d6349c92cf7283339ce0071ee338428a855190 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 5 Mar 2026 09:48:05 -0500 Subject: [PATCH 5/6] Address PR feedback. --- .../iceberg/src/arrow/delete_file_loader.rs | 3 +- crates/iceberg/src/arrow/reader.rs | 247 ++++++++++++------ 2 files changed, 168 insertions(+), 82 deletions(-) diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index d412f88e47..cb9ff9a8d5 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -64,10 +64,9 @@ impl BasicDeleteFileLoader { let record_batch_stream = ArrowReader::create_parquet_record_batch_stream_builder( data_file_path, self.file_io.clone(), - false, None, file_size_in_bytes, - ParquetReadOptions::default(), + ParquetReadOptions::builder().build(), ) .await? .build()? diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index d829ae63b6..e83605b877 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -43,6 +43,7 @@ use parquet::file::metadata::{ PageIndexPolicy, ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData, }; use parquet::schema::types::{SchemaDescriptor, Type as ParquetType}; +use typed_builder::TypedBuilder; use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; @@ -68,23 +69,67 @@ const DEFAULT_RANGE_COALESCE_BYTES: u64 = 1024 * 1024; /// Matches object_store's `OBJECT_STORE_COALESCE_PARALLEL`. const DEFAULT_RANGE_FETCH_CONCURRENCY: usize = 10; +/// Default number of bytes to prefetch when parsing Parquet footer metadata. +/// Matches DataFusion's default `ParquetOptions::metadata_size_hint`. +const DEFAULT_METADATA_SIZE_HINT: usize = 512 * 1024; + /// Options for tuning Parquet file I/O. -#[derive(Clone, Copy, Debug)] +#[derive(Clone, Copy, Debug, TypedBuilder)] +#[builder(field_defaults(setter(prefix = "with_")))] pub(crate) struct ParquetReadOptions { - pub metadata_size_hint: Option, + /// Number of bytes to prefetch for parsing the Parquet metadata. + /// + /// This hint can help reduce the number of fetch requests. For more details see the + /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint). + /// + /// Defaults to 512 KiB, matching DataFusion's default `ParquetOptions::metadata_size_hint`. + #[builder(default = Some(DEFAULT_METADATA_SIZE_HINT))] + pub(crate) metadata_size_hint: Option, /// Gap threshold for merging nearby byte ranges into a single request. - pub range_coalesce_bytes: u64, + /// Ranges with gaps smaller than this value will be coalesced. + /// + /// Defaults to 1 MiB, matching object_store's `OBJECT_STORE_COALESCE_DEFAULT`. + #[builder(default = DEFAULT_RANGE_COALESCE_BYTES)] + pub(crate) range_coalesce_bytes: u64, /// Maximum number of merged byte ranges to fetch concurrently. - pub range_fetch_concurrency: usize, + /// + /// Defaults to 10, matching object_store's `OBJECT_STORE_COALESCE_PARALLEL`. + #[builder(default = DEFAULT_RANGE_FETCH_CONCURRENCY)] + pub(crate) range_fetch_concurrency: usize, + /// Whether to preload the column index when reading Parquet metadata. + #[builder(default = true)] + pub(crate) preload_column_index: bool, + /// Whether to preload the offset index when reading Parquet metadata. + #[builder(default = true)] + pub(crate) preload_offset_index: bool, + /// Whether to preload the page index when reading Parquet metadata. + #[builder(default = false)] + pub(crate) preload_page_index: bool, } -impl Default for ParquetReadOptions { - fn default() -> Self { - Self { - metadata_size_hint: None, - range_coalesce_bytes: DEFAULT_RANGE_COALESCE_BYTES, - range_fetch_concurrency: DEFAULT_RANGE_FETCH_CONCURRENCY, - } +impl ParquetReadOptions { + pub(crate) fn metadata_size_hint(&self) -> Option { + self.metadata_size_hint + } + + pub(crate) fn range_coalesce_bytes(&self) -> u64 { + self.range_coalesce_bytes + } + + pub(crate) fn range_fetch_concurrency(&self) -> usize { + self.range_fetch_concurrency + } + + pub(crate) fn preload_column_index(&self) -> bool { + self.preload_column_index + } + + pub(crate) fn preload_offset_index(&self) -> bool { + self.preload_offset_index + } + + pub(crate) fn preload_page_index(&self) -> bool { + self.preload_page_index } } @@ -109,7 +154,7 @@ impl ArrowReaderBuilder { concurrency_limit_data_files: num_cpus, row_group_filtering_enabled: true, row_selection_enabled: false, - parquet_read_options: ParquetReadOptions::default(), + parquet_read_options: ParquetReadOptions::builder().build(), } } @@ -269,6 +314,8 @@ impl ArrowReader { ) -> Result { let should_load_page_index = (row_selection_enabled && task.predicate.is_some()) || !task.deletes.is_empty(); + let mut parquet_read_options = parquet_read_options; + parquet_read_options.preload_page_index = should_load_page_index; let delete_filter_rx = delete_file_loader.load_deletes(&task.deletes, Arc::clone(&task.schema)); @@ -278,7 +325,6 @@ impl ArrowReader { let initial_stream_builder = Self::create_parquet_record_batch_stream_builder( &task.data_file_path, file_io.clone(), - should_load_page_index, None, task.file_size_in_bytes, parquet_read_options, @@ -332,7 +378,6 @@ impl ArrowReader { Self::create_parquet_record_batch_stream_builder( &task.data_file_path, file_io.clone(), - should_load_page_index, Some(options), task.file_size_in_bytes, parquet_read_options, @@ -537,7 +582,6 @@ impl ArrowReader { pub(crate) async fn create_parquet_record_batch_stream_builder( data_file_path: &str, file_io: FileIO, - should_load_page_index: bool, arrow_reader_options: Option, file_size_in_bytes: u64, parquet_read_options: ParquetReadOptions, @@ -546,21 +590,13 @@ impl ArrowReader { // a reader for the data within let parquet_file = file_io.new_input(data_file_path)?; let parquet_reader = parquet_file.reader().await?; - let mut parquet_file_reader = ArrowFileReader::new( + let parquet_file_reader = ArrowFileReader::new( FileMetadata { size: file_size_in_bytes, }, parquet_reader, ) - .with_preload_column_index(true) - .with_preload_offset_index(true) - .with_preload_page_index(should_load_page_index) - .with_range_coalesce_bytes(parquet_read_options.range_coalesce_bytes) - .with_range_fetch_concurrency(parquet_read_options.range_fetch_concurrency); - - if let Some(hint) = parquet_read_options.metadata_size_hint { - parquet_file_reader = parquet_file_reader.with_metadata_size_hint(hint); - } + .with_parquet_read_options(parquet_read_options); // Create the record batch stream builder, which wraps the parquet file reader let options = arrow_reader_options.unwrap_or_default(); @@ -1752,67 +1788,23 @@ impl BoundPredicateVisitor for PredicateConverter<'_> { /// ArrowFileReader is a wrapper around a FileRead that impls parquets AsyncFileReader. pub struct ArrowFileReader { meta: FileMetadata, - preload_column_index: bool, - preload_offset_index: bool, - preload_page_index: bool, - metadata_size_hint: Option, - range_coalesce_bytes: u64, - range_fetch_concurrency: usize, + parquet_read_options: ParquetReadOptions, r: Box, } impl ArrowFileReader { /// Create a new ArrowFileReader pub fn new(meta: FileMetadata, r: Box) -> Self { - let defaults = ParquetReadOptions::default(); Self { meta, - preload_column_index: false, - preload_offset_index: false, - preload_page_index: false, - metadata_size_hint: None, - range_coalesce_bytes: defaults.range_coalesce_bytes, - range_fetch_concurrency: defaults.range_fetch_concurrency, + parquet_read_options: ParquetReadOptions::builder().build(), r, } } - /// Enable or disable preloading of the column index - pub fn with_preload_column_index(mut self, preload: bool) -> Self { - self.preload_column_index = preload; - self - } - - /// Enable or disable preloading of the offset index - pub fn with_preload_offset_index(mut self, preload: bool) -> Self { - self.preload_offset_index = preload; - self - } - - /// Enable or disable preloading of the page index - pub fn with_preload_page_index(mut self, preload: bool) -> Self { - self.preload_page_index = preload; - self - } - - /// Provide a hint as to the number of bytes to prefetch for parsing the Parquet metadata - /// - /// This hint can help reduce the number of fetch requests. For more details see the - /// [ParquetMetaDataReader documentation](https://docs.rs/parquet/latest/parquet/file/metadata/struct.ParquetMetaDataReader.html#method.with_prefetch_hint). - pub fn with_metadata_size_hint(mut self, hint: usize) -> Self { - self.metadata_size_hint = Some(hint); - self - } - - /// Sets the gap threshold for merging nearby byte ranges into a single request. - pub fn with_range_coalesce_bytes(mut self, range_coalesce_bytes: u64) -> Self { - self.range_coalesce_bytes = range_coalesce_bytes; - self - } - - /// Sets the maximum number of merged byte ranges to fetch concurrently. - pub fn with_range_fetch_concurrency(mut self, range_fetch_concurrency: usize) -> Self { - self.range_fetch_concurrency = range_fetch_concurrency; + /// Configure all Parquet read options. + pub(crate) fn with_parquet_read_options(mut self, options: ParquetReadOptions) -> Self { + self.parquet_read_options = options; self } } @@ -1834,8 +1826,8 @@ impl AsyncFileReader for ArrowFileReader { &mut self, ranges: Vec>, ) -> BoxFuture<'_, parquet::errors::Result>> { - let coalesce_bytes = self.range_coalesce_bytes; - let concurrency = self.range_fetch_concurrency; + let coalesce_bytes = self.parquet_read_options.range_coalesce_bytes(); + let concurrency = self.parquet_read_options.range_fetch_concurrency(); async move { // Merge nearby ranges to reduce the number of object store requests. @@ -1877,11 +1869,17 @@ impl AsyncFileReader for ArrowFileReader { ) -> BoxFuture<'_, parquet::errors::Result>> { async move { let reader = ParquetMetaDataReader::new() - .with_prefetch_hint(self.metadata_size_hint) + .with_prefetch_hint(self.parquet_read_options.metadata_size_hint()) // Set the page policy first because it updates both column and offset policies. - .with_page_index_policy(PageIndexPolicy::from(self.preload_page_index)) - .with_column_index_policy(PageIndexPolicy::from(self.preload_column_index)) - .with_offset_index_policy(PageIndexPolicy::from(self.preload_offset_index)); + .with_page_index_policy(PageIndexPolicy::from( + self.parquet_read_options.preload_page_index(), + )) + .with_column_index_policy(PageIndexPolicy::from( + self.parquet_read_options.preload_column_index(), + )) + .with_offset_index_policy(PageIndexPolicy::from( + self.parquet_read_options.preload_offset_index(), + )); let size = self.meta.size; let meta = reader.load_and_finish(self, size).await?; @@ -4495,4 +4493,93 @@ message schema { let merged = super::merge_ranges(&ranges, 1024); assert_eq!(merged, vec![0..600]); } + + /// Mock FileRead backed by a flat byte buffer. + struct MockFileRead { + data: bytes::Bytes, + } + + impl MockFileRead { + fn new(size: usize) -> Self { + // Fill with sequential byte values so slices are verifiable. + let data: Vec = (0..size).map(|i| (i % 256) as u8).collect(); + Self { + data: bytes::Bytes::from(data), + } + } + } + + #[async_trait::async_trait] + impl crate::io::FileRead for MockFileRead { + async fn read(&self, range: Range) -> crate::Result { + Ok(self.data.slice(range.start as usize..range.end as usize)) + } + } + + #[tokio::test] + async fn test_get_byte_ranges_no_coalesce() { + use parquet::arrow::async_reader::AsyncFileReader; + + let mock = MockFileRead::new(2048); + let expected_0 = mock.data.slice(0..100); + let expected_1 = mock.data.slice(1500..1600); + + let mut reader = + super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock)) + .with_parquet_read_options( + super::ParquetReadOptions::builder() + .with_range_coalesce_bytes(0) + .build(), + ); + + let result = reader + .get_byte_ranges(vec![0..100, 1500..1600]) + .await + .unwrap(); + + assert_eq!(result.len(), 2); + assert_eq!(result[0], expected_0); + assert_eq!(result[1], expected_1); + } + + #[tokio::test] + async fn test_get_byte_ranges_with_coalesce() { + use parquet::arrow::async_reader::AsyncFileReader; + + let mock = MockFileRead::new(1024); + let expected_0 = mock.data.slice(0..100); + let expected_1 = mock.data.slice(200..300); + let expected_2 = mock.data.slice(500..600); + + let mut reader = + super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock)) + .with_parquet_read_options( + super::ParquetReadOptions::builder() + .with_range_coalesce_bytes(1024) + .build(), + ); + + // All ranges within coalesce threshold — should merge into one fetch. + let result = reader + .get_byte_ranges(vec![0..100, 200..300, 500..600]) + .await + .unwrap(); + + assert_eq!(result.len(), 3); + assert_eq!(result[0], expected_0); + assert_eq!(result[1], expected_1); + assert_eq!(result[2], expected_2); + } + + #[tokio::test] + async fn test_get_byte_ranges_empty() { + use parquet::arrow::async_reader::AsyncFileReader; + + let mock = MockFileRead::new(1024); + let mut reader = + super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock)); + + let result = reader.get_byte_ranges(vec![]).await.unwrap(); + assert!(result.is_empty()); + } } From 535d5fafc63790da6638bce826be0b9e8580e45a Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Thu, 5 Mar 2026 10:14:36 -0500 Subject: [PATCH 6/6] More tests for edge cases. --- crates/iceberg/src/arrow/reader.rs | 83 +++++++++++++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index e83605b877..128d9703ef 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -1827,7 +1827,7 @@ impl AsyncFileReader for ArrowFileReader { ranges: Vec>, ) -> BoxFuture<'_, parquet::errors::Result>> { let coalesce_bytes = self.parquet_read_options.range_coalesce_bytes(); - let concurrency = self.parquet_read_options.range_fetch_concurrency(); + let concurrency = self.parquet_read_options.range_fetch_concurrency().max(1); async move { // Merge nearby ranges to reduce the number of object store requests. @@ -4582,4 +4582,85 @@ message schema { let result = reader.get_byte_ranges(vec![]).await.unwrap(); assert!(result.is_empty()); } + + #[tokio::test] + async fn test_get_byte_ranges_coalesce_max() { + use parquet::arrow::async_reader::AsyncFileReader; + + let mock = MockFileRead::new(2048); + let expected_0 = mock.data.slice(0..100); + let expected_1 = mock.data.slice(1500..1600); + + let mut reader = + super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock)) + .with_parquet_read_options( + super::ParquetReadOptions::builder() + .with_range_coalesce_bytes(u64::MAX) + .build(), + ); + + // u64::MAX coalesce — all ranges merge into a single fetch. + let result = reader + .get_byte_ranges(vec![0..100, 1500..1600]) + .await + .unwrap(); + + assert_eq!(result.len(), 2); + assert_eq!(result[0], expected_0); + assert_eq!(result[1], expected_1); + } + + #[tokio::test] + async fn test_get_byte_ranges_concurrency_zero() { + use parquet::arrow::async_reader::AsyncFileReader; + + // concurrency=0 is clamped to 1, so this should not hang. + let mock = MockFileRead::new(1024); + let expected = mock.data.slice(0..100); + + let mut reader = + super::ArrowFileReader::new(crate::io::FileMetadata { size: 1024 }, Box::new(mock)) + .with_parquet_read_options( + super::ParquetReadOptions::builder() + .with_range_fetch_concurrency(0) + .build(), + ); + + let result = reader + .get_byte_ranges(vec![0..100, 200..300]) + .await + .unwrap(); + assert_eq!(result.len(), 2); + assert_eq!(result[0], expected); + } + + #[tokio::test] + async fn test_get_byte_ranges_concurrency_one() { + use parquet::arrow::async_reader::AsyncFileReader; + + let mock = MockFileRead::new(2048); + let expected_0 = mock.data.slice(0..100); + let expected_1 = mock.data.slice(500..600); + let expected_2 = mock.data.slice(1500..1600); + + let mut reader = + super::ArrowFileReader::new(crate::io::FileMetadata { size: 2048 }, Box::new(mock)) + .with_parquet_read_options( + super::ParquetReadOptions::builder() + .with_range_coalesce_bytes(0) + .with_range_fetch_concurrency(1) + .build(), + ); + + // concurrency=1 with no coalescing — sequential fetches. + let result = reader + .get_byte_ranges(vec![0..100, 500..600, 1500..1600]) + .await + .unwrap(); + + assert_eq!(result.len(), 3); + assert_eq!(result[0], expected_0); + assert_eq!(result[1], expected_1); + assert_eq!(result[2], expected_2); + } }