Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,12 @@ config_namespace! {
/// Number of files to read in parallel when inferring schema and statistics
pub meta_fetch_concurrency: usize, default = 32

/// Number of files to prefetch (open concurrently) within a single FileStream partition.
/// This controls the balance between I/O and CPU utilization. Higher values can improve
/// I/O throughput by opening more files in parallel while processing current files,
/// but use more memory and file handles. Setting to 1 maintains current behavior.
pub file_prefetch_depth: usize, default = 1

/// Guarantees a minimum level of output files running in parallel.
/// RecordBatches will be distributed in round robin fashion to each
/// parallel writer. Each writer is closed and a new file opened once
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1286,6 +1286,9 @@ impl TableProvider for ListingTable {
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols)
.with_expr_adapter(self.expr_adapter_factory.clone())
.with_file_prefetch_depth(
state.config_options().execution.file_prefetch_depth,
)
.build(),
)
.await?;
Expand Down
17 changes: 17 additions & 0 deletions datafusion/datasource/src/file_scan_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ pub struct FileScanConfig {
/// Expression adapter used to adapt filters and projections that are pushed down into the scan
/// from the logical schema to the physical schema of the file.
pub expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
/// Number of files to prefetch (open concurrently) within a single FileStream partition.
/// This controls the balance between I/O and CPU utilization. Higher values can improve
/// I/O throughput but use more memory and file handles. Defaults to 1.
pub file_prefetch_depth: usize,
}

/// A builder for [`FileScanConfig`]'s.
Expand Down Expand Up @@ -275,6 +279,7 @@ pub struct FileScanConfigBuilder {
new_lines_in_values: Option<bool>,
batch_size: Option<usize>,
expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
file_prefetch_depth: Option<usize>,
}

impl FileScanConfigBuilder {
Expand Down Expand Up @@ -304,6 +309,7 @@ impl FileScanConfigBuilder {
constraints: None,
batch_size: None,
expr_adapter_factory: None,
file_prefetch_depth: None,
}
}

Expand Down Expand Up @@ -426,6 +432,13 @@ impl FileScanConfigBuilder {
self
}

/// Set the file prefetch depth (number of files to open concurrently).
/// Higher values can improve I/O throughput but use more memory and file handles.
pub fn with_file_prefetch_depth(mut self, file_prefetch_depth: usize) -> Self {
self.file_prefetch_depth = Some(file_prefetch_depth);
self
}

/// Build the final [`FileScanConfig`] with all the configured settings.
///
/// This method takes ownership of the builder and returns the constructed `FileScanConfig`.
Expand All @@ -446,6 +459,7 @@ impl FileScanConfigBuilder {
new_lines_in_values,
batch_size,
expr_adapter_factory: expr_adapter,
file_prefetch_depth,
} = self;

let constraints = constraints.unwrap_or_default();
Expand All @@ -458,6 +472,7 @@ impl FileScanConfigBuilder {
let file_compression_type =
file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED);
let new_lines_in_values = new_lines_in_values.unwrap_or(false);
let file_prefetch_depth = file_prefetch_depth.unwrap_or(1);

FileScanConfig {
object_store_url,
Expand All @@ -473,6 +488,7 @@ impl FileScanConfigBuilder {
new_lines_in_values,
batch_size,
expr_adapter_factory: expr_adapter,
file_prefetch_depth,
}
}
}
Expand All @@ -494,6 +510,7 @@ impl From<FileScanConfig> for FileScanConfigBuilder {
constraints: Some(config.constraints),
batch_size: config.batch_size,
expr_adapter_factory: config.expr_adapter_factory,
file_prefetch_depth: Some(config.file_prefetch_depth),
}
}
}
Expand Down
Loading
Loading