diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index adfb1a6efd00..20cfb339a63c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -432,6 +432,12 @@ config_namespace! { /// Number of files to read in parallel when inferring schema and statistics pub meta_fetch_concurrency: usize, default = 32 + /// Number of files to prefetch (open concurrently) within a single FileStream partition. + /// This controls the balance between I/O and CPU utilization. Higher values can improve + /// I/O throughput by opening more files in parallel while processing current files, + /// but use more memory and file handles. Setting to 1 maintains current behavior. + pub file_prefetch_depth: usize, default = 1 + /// Guarantees a minimum level of output files running in parallel. /// RecordBatches will be distributed in round robin fashion to each /// parallel writer. Each writer is closed and a new file opened once diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index de58740bcd05..0116c06ccf50 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1286,6 +1286,9 @@ impl TableProvider for ListingTable { .with_output_ordering(output_ordering) .with_table_partition_cols(table_partition_cols) .with_expr_adapter(self.expr_adapter_factory.clone()) + .with_file_prefetch_depth( + state.config_options().execution.file_prefetch_depth, + ) .build(), ) .await?; diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 79f0082d514f..14e0540c8de2 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -196,6 +196,10 @@ pub struct FileScanConfig { /// Expression adapter used to adapt filters and projections that are pushed down into the scan /// from the logical schema to the physical schema of the file. pub expr_adapter_factory: Option>, + /// Number of files to prefetch (open concurrently) within a single FileStream partition. + /// This controls the balance between I/O and CPU utilization. Higher values can improve + /// I/O throughput but use more memory and file handles. Defaults to 1. + pub file_prefetch_depth: usize, } /// A builder for [`FileScanConfig`]'s. @@ -275,6 +279,7 @@ pub struct FileScanConfigBuilder { new_lines_in_values: Option, batch_size: Option, expr_adapter_factory: Option>, + file_prefetch_depth: Option, } impl FileScanConfigBuilder { @@ -304,6 +309,7 @@ impl FileScanConfigBuilder { constraints: None, batch_size: None, expr_adapter_factory: None, + file_prefetch_depth: None, } } @@ -426,6 +432,13 @@ impl FileScanConfigBuilder { self } + /// Set the file prefetch depth (number of files to open concurrently). + /// Higher values can improve I/O throughput but use more memory and file handles. + pub fn with_file_prefetch_depth(mut self, file_prefetch_depth: usize) -> Self { + self.file_prefetch_depth = Some(file_prefetch_depth); + self + } + /// Build the final [`FileScanConfig`] with all the configured settings. /// /// This method takes ownership of the builder and returns the constructed `FileScanConfig`. @@ -446,6 +459,7 @@ impl FileScanConfigBuilder { new_lines_in_values, batch_size, expr_adapter_factory: expr_adapter, + file_prefetch_depth, } = self; let constraints = constraints.unwrap_or_default(); @@ -458,6 +472,7 @@ impl FileScanConfigBuilder { let file_compression_type = file_compression_type.unwrap_or(FileCompressionType::UNCOMPRESSED); let new_lines_in_values = new_lines_in_values.unwrap_or(false); + let file_prefetch_depth = file_prefetch_depth.unwrap_or(1); FileScanConfig { object_store_url, @@ -473,6 +488,7 @@ impl FileScanConfigBuilder { new_lines_in_values, batch_size, expr_adapter_factory: expr_adapter, + file_prefetch_depth, } } } @@ -494,6 +510,7 @@ impl From for FileScanConfigBuilder { constraints: Some(config.constraints), batch_size: config.batch_size, expr_adapter_factory: config.expr_adapter_factory, + file_prefetch_depth: Some(config.file_prefetch_depth), } } } diff --git a/datafusion/datasource/src/file_stream.rs b/datafusion/datasource/src/file_stream.rs index 54690ba49649..c08a5d4e9a70 100644 --- a/datafusion/datasource/src/file_stream.rs +++ b/datafusion/datasource/src/file_stream.rs @@ -67,6 +67,8 @@ pub struct FileStream { baseline_metrics: BaselineMetrics, /// Describes the behavior of the `FileStream` if file opening or scanning fails on_error: OnError, + /// Number of files to prefetch (open concurrently) + file_prefetch_depth: usize, } impl FileStream { @@ -99,6 +101,7 @@ impl FileStream { file_stream_metrics: FileStreamMetrics::new(metrics, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), on_error: OnError::Fail, + file_prefetch_depth: config.file_prefetch_depth, }) } @@ -133,6 +136,36 @@ impl FileStream { ) } + /// Fill the prefetch queue up to the configured depth + fn fill_prefetch_queue( + &mut self, + queue: &mut VecDeque<(NextOpen, Vec)>, + ) { + while queue.len() < self.file_prefetch_depth && !self.file_iter.is_empty() { + match self.start_next_file().transpose() { + Ok(Some((future, partition_values))) => { + queue.push_back((NextOpen::Pending(future), partition_values)); + } + Ok(None) => break, + Err(_) => break, + } + } + } + + /// Poll all pending futures in the prefetch queue + fn poll_prefetch_queue( + queue: &mut VecDeque<(NextOpen, Vec)>, + cx: &mut Context<'_>, + ) { + for (next_open, _) in queue.iter_mut() { + if let NextOpen::Pending(f) = next_open { + if let Poll::Ready(reader) = f.as_mut().poll(cx) { + *next_open = NextOpen::Ready(reader); + } + } + } + } + fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll>> { loop { match &mut self.state { @@ -144,6 +177,7 @@ impl FileStream { self.state = FileStreamState::Open { future, partition_values, + prefetch_queue: VecDeque::new(), } } Ok(None) => return Poll::Ready(None), @@ -156,46 +190,60 @@ impl FileStream { FileStreamState::Open { future, partition_values, + prefetch_queue, } => match ready!(future.poll_unpin(cx)) { Ok(reader) => { let partition_values = mem::take(partition_values); // include time needed to start opening in `start_next_file` self.file_stream_metrics.time_opening.stop(); - let next = self.start_next_file().transpose(); + + // Fill the prefetch queue (reuse existing queue from previous scan) + let mut prefetch_queue = mem::take(prefetch_queue); + self.fill_prefetch_queue(&mut prefetch_queue); + self.file_stream_metrics.time_scanning_until_data.start(); self.file_stream_metrics.time_scanning_total.start(); - match next { - Ok(Some((next_future, next_partition_values))) => { - self.state = FileStreamState::Scan { - partition_values, - reader, - next: Some(( - NextOpen::Pending(next_future), - next_partition_values, - )), - }; - } - Ok(None) => { - self.state = FileStreamState::Scan { - reader, - partition_values, - next: None, - }; - } - Err(e) => { - self.state = FileStreamState::Error; - return Poll::Ready(Some(Err(e))); - } - } + self.state = FileStreamState::Scan { + partition_values, + reader, + prefetch_queue, + }; } Err(e) => { self.file_stream_metrics.file_open_errors.add(1); match self.on_error { OnError::Skip => { self.file_stream_metrics.time_opening.stop(); - self.state = FileStreamState::Idle + + // Try to open the next file from the prefetch queue + if let Some((future, partition_values)) = + prefetch_queue.pop_front() + { + let remaining_queue = mem::take(prefetch_queue); + match future { + NextOpen::Pending(future) => { + self.state = FileStreamState::Open { + future, + partition_values, + prefetch_queue: remaining_queue, + } + } + NextOpen::Ready(reader) => { + self.state = FileStreamState::Open { + future: Box::pin(std::future::ready( + reader, + )), + partition_values, + prefetch_queue: remaining_queue, + } + } + } + } else { + // No more files in queue, go to Idle to check file_iter + self.state = FileStreamState::Idle + } } OnError::Fail => { self.state = FileStreamState::Error; @@ -207,14 +255,40 @@ impl FileStream { FileStreamState::Scan { reader, partition_values, - next, + prefetch_queue, } => { - // We need to poll the next `FileOpenFuture` here to drive it forward - if let Some((next_open_future, _)) = next { - if let NextOpen::Pending(f) = next_open_future { - if let Poll::Ready(reader) = f.as_mut().poll(cx) { - *next_open_future = NextOpen::Ready(reader); + // Poll all pending futures in the prefetch queue to drive them forward + Self::poll_prefetch_queue(prefetch_queue, cx); + + // Fill the prefetch queue if needed - we need to do this before borrowing other parts + let file_prefetch_depth = self.file_prefetch_depth; + let file_iter = &mut self.file_iter; + let file_opener = Arc::clone(&self.file_opener); + + while prefetch_queue.len() < file_prefetch_depth + && !file_iter.is_empty() + { + let part_file = match file_iter.pop_front() { + Some(file) => file, + None => break, + }; + + let file_meta = FileMeta { + object_meta: part_file.object_meta.clone(), + range: part_file.range.clone(), + extensions: part_file.extensions.clone(), + metadata_size_hint: part_file.metadata_size_hint, + }; + + let partition_values = part_file.partition_values.clone(); + match file_opener.open(file_meta, part_file) { + Ok(future) => { + prefetch_queue.push_back(( + NextOpen::Pending(future), + partition_values, + )); } + Err(_) => break, } } match ready!(reader.poll_next_unpin(cx)) { @@ -254,15 +328,21 @@ impl FileStream { match self.on_error { // If `OnError::Skip` we skip the file as soon as we hit the first error - OnError::Skip => match mem::take(next) { - Some((future, partition_values)) => { + OnError::Skip => { + if let Some((future, partition_values)) = + prefetch_queue.pop_front() + { self.file_stream_metrics.time_opening.start(); + // Move the remaining queue to the Open state + let remaining_queue = mem::take(prefetch_queue); + match future { NextOpen::Pending(future) => { self.state = FileStreamState::Open { future, partition_values, + prefetch_queue: remaining_queue, } } NextOpen::Ready(reader) => { @@ -271,12 +351,15 @@ impl FileStream { reader, )), partition_values, + prefetch_queue: remaining_queue, } } } + } else { + // Prefetch queue is empty, go to Idle to check file_iter + self.state = FileStreamState::Idle; } - None => return Poll::Ready(None), - }, + } OnError::Fail => { self.state = FileStreamState::Error; return Poll::Ready(Some(Err(err))); @@ -287,28 +370,33 @@ impl FileStream { self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); - match mem::take(next) { - Some((future, partition_values)) => { - self.file_stream_metrics.time_opening.start(); + if let Some((future, partition_values)) = + prefetch_queue.pop_front() + { + self.file_stream_metrics.time_opening.start(); - match future { - NextOpen::Pending(future) => { - self.state = FileStreamState::Open { - future, - partition_values, - } + // Move the remaining queue to the Open state + let remaining_queue = mem::take(prefetch_queue); + + match future { + NextOpen::Pending(future) => { + self.state = FileStreamState::Open { + future, + partition_values, + prefetch_queue: remaining_queue, } - NextOpen::Ready(reader) => { - self.state = FileStreamState::Open { - future: Box::pin(std::future::ready( - reader, - )), - partition_values, - } + } + NextOpen::Ready(reader) => { + self.state = FileStreamState::Open { + future: Box::pin(std::future::ready(reader)), + partition_values, + prefetch_queue: remaining_queue, } } } - None => return Poll::Ready(None), + } else { + // Prefetch queue is empty, go to Idle to check file_iter + self.state = FileStreamState::Idle; } } } @@ -387,6 +475,8 @@ pub enum FileStreamState { future: FileOpenFuture, /// The partition values for this file partition_values: Vec, + /// Queue of prefetched files to preserve across state transitions + prefetch_queue: VecDeque<(NextOpen, Vec)>, }, /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`] /// returned by [`FileOpener::open`] @@ -395,11 +485,10 @@ pub enum FileStreamState { partition_values: Vec, /// The reader instance reader: BoxStream<'static, Result>, - /// A [`FileOpenFuture`] for the next file to be processed, - /// and its corresponding partition column values, if any. - /// This allows the next file to be opened in parallel while the - /// current file is read. - next: Option<(NextOpen, Vec)>, + /// Queue of [`FileOpenFuture`]s for upcoming files to be processed, + /// and their corresponding partition column values. This allows + /// multiple files to be opened in parallel while the current file is read. + prefetch_queue: VecDeque<(NextOpen, Vec)>, }, /// Encountered an error Error, @@ -584,6 +673,8 @@ mod tests { on_error: OnError, /// Mock `FileOpener` opener: TestOpener, + /// Number of files to prefetch + file_prefetch_depth: Option, } impl FileStreamTest { @@ -630,6 +721,12 @@ mod tests { self } + /// Specify the file prefetch depth + pub fn with_file_prefetch_depth(mut self, depth: usize) -> Self { + self.file_prefetch_depth = Some(depth); + self + } + /// Collect the results of the `FileStream` pub async fn result(self) -> Result> { let file_schema = self @@ -656,14 +753,19 @@ mod tests { let on_error = self.on_error; - let config = FileScanConfigBuilder::new( + let mut config_builder = FileScanConfigBuilder::new( ObjectStoreUrl::parse("test:///").unwrap(), file_schema, Arc::new(MockSource::default()), ) .with_file_group(file_group) - .with_limit(self.limit) - .build(); + .with_limit(self.limit); + + if let Some(depth) = self.file_prefetch_depth { + config_builder = config_builder.with_file_prefetch_depth(depth); + } + + let config = config_builder.build(); let metrics_set = ExecutionPlanMetricsSet::new(); let file_stream = FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set) @@ -983,4 +1085,293 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_prefetch_with_errors() { + // Test prefetch behavior when some files have errors - this should behave the same as the original error test + let batches = FileStreamTest::new() + .with_records(vec![make_partition(1)]) + .with_num_files(3) + .with_file_prefetch_depth(2) + .with_on_error(OnError::Skip) + .with_open_errors(vec![1]) // File 1 will fail to open + .result() + .await + .unwrap(); + + // Should get 2 batches (from files 0 and 2, skipping 1) + assert_eq!(batches.len(), 2); + } + + #[tokio::test] + async fn test_prefetch_depth_greater_than_num_files() { + // Test with prefetch depth larger than number of files - should work correctly + let batches = FileStreamTest::new() + .with_records(vec![make_partition(2)]) + .with_num_files(3) + .with_file_prefetch_depth(10) // Much larger than num_files + .result() + .await + .unwrap(); + + // Should get 3 batches (one from each file), prefetch depth shouldn't cause issues + assert_eq!(batches.len(), 3); + } + + #[tokio::test] + async fn test_prefetch_depth_0() { + // Test with prefetch depth = 0 (no prefetching) + let batches = FileStreamTest::new() + .with_records(vec![make_partition(2), make_partition(3)]) + .with_num_files(5) + .with_file_prefetch_depth(0) + .result() + .await + .unwrap(); + + // Should get 10 batches (5 files * 2 batches each) + assert_eq!(batches.len(), 10); + + // Verify total row count: 5 files * (2 rows + 3 rows) = 25 rows + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 25); + } + + #[tokio::test] + async fn test_prefetch_depth_1() { + // Test with prefetch depth = 1 (default behavior) + let batches = FileStreamTest::new() + .with_records(vec![make_partition(2), make_partition(3)]) + .with_num_files(5) + .with_file_prefetch_depth(1) + .result() + .await + .unwrap(); + + // Should get 10 batches (5 files * 2 batches each) + assert_eq!(batches.len(), 10); + } + + #[tokio::test] + async fn test_prefetch_depth_2() { + // Test with prefetch depth = 2 + let batches = FileStreamTest::new() + .with_records(vec![make_partition(2), make_partition(3)]) + .with_num_files(5) + .with_file_prefetch_depth(2) + .result() + .await + .unwrap(); + + // Should get 10 batches (5 files * 2 batches each) + assert_eq!(batches.len(), 10); + } + + #[tokio::test] + async fn test_prefetch_depth_5() { + // Test with prefetch depth = 5 + let batches = FileStreamTest::new() + .with_records(vec![make_partition(1)]) + .with_num_files(10) + .with_file_prefetch_depth(5) + .result() + .await + .unwrap(); + + // Should get 10 batches (one from each file) + assert_eq!(batches.len(), 10); + } + + #[tokio::test] + async fn test_prefetch_depth_10() { + // Test with prefetch depth = 10 + let batches = FileStreamTest::new() + .with_records(vec![make_partition(3)]) + .with_num_files(20) + .with_file_prefetch_depth(10) + .result() + .await + .unwrap(); + + // Should get 20 batches (one from each file) + assert_eq!(batches.len(), 20); + } + + #[tokio::test] + async fn test_prefetch_with_limit() -> Result<()> { + // With no limit we get >23 rows (10 files * 5 rows each = 50) + let batches = FileStreamTest::new() + .with_records(vec![make_partition(5)]) + .with_num_files(10) + .with_file_prefetch_depth(5) + .result() + .await?; + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert!(total_rows > 23); + + // With a limit of 23 we should get exactly 23 rows + let batches = FileStreamTest::new() + .with_records(vec![make_partition(5)]) + .with_num_files(10) + .with_file_prefetch_depth(5) + .with_limit(Some(23)) // Limit in the middle of 5th file + .result() + .await?; + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 23); + + Ok(()) + } + + #[tokio::test] + async fn test_prefetch_with_limit_at_file_boundary() { + // Test prefetch with limit that falls exactly on a file boundary + let batches = FileStreamTest::new() + .with_records(vec![make_partition(5)]) + .with_num_files(10) + .with_file_prefetch_depth(3) + .with_limit(Some(15)) // Exactly 3 files worth of data + .result() + .await + .unwrap(); + + let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total_rows, 15); + } + + #[tokio::test] + async fn test_prefetch_error_at_queue_start() { + // Test error at the beginning of prefetch queue + let batches = FileStreamTest::new() + .with_records(vec![make_partition(2), make_partition(3)]) + .with_num_files(5) + .with_file_prefetch_depth(3) + .with_on_error(OnError::Skip) + .with_open_errors(vec![0]) // First file fails + .result() + .await + .unwrap(); + + // Should get 8 batches (4 files * 2 batches each, skipping file 0) + assert_eq!(batches.len(), 8); + } + + #[tokio::test] + async fn test_prefetch_error_at_queue_middle() { + // Test error in the middle of prefetch queue + let batches = FileStreamTest::new() + .with_records(vec![make_partition(2), make_partition(3)]) + .with_num_files(5) + .with_file_prefetch_depth(3) + .with_on_error(OnError::Skip) + .with_open_errors(vec![2]) // Middle file fails + .result() + .await + .unwrap(); + + // Should get 8 batches (4 files * 2 batches each, skipping file 2) + assert_eq!(batches.len(), 8); + } + + #[tokio::test] + async fn test_prefetch_error_at_queue_end() { + // Test error at the end of prefetch queue + let batches = FileStreamTest::new() + .with_records(vec![make_partition(2), make_partition(3)]) + .with_num_files(5) + .with_file_prefetch_depth(3) + .with_on_error(OnError::Skip) + .with_open_errors(vec![4]) // Last file fails + .result() + .await + .unwrap(); + + // Should get 8 batches (4 files * 2 batches each, skipping file 4) + assert_eq!(batches.len(), 8); + } + + #[tokio::test] + async fn test_prefetch_multiple_errors_in_queue() { + // Test multiple errors within the prefetch queue + let batches = FileStreamTest::new() + .with_records(vec![make_partition(2), make_partition(1)]) + .with_num_files(10) + .with_file_prefetch_depth(5) + .with_on_error(OnError::Skip) + .with_open_errors(vec![1, 3, 7]) // Multiple files fail + .result() + .await + .unwrap(); + + // Should get 14 batches (7 files * 2 batches each, skipping 3 files) + assert_eq!(batches.len(), 14); + } + + #[tokio::test] + async fn test_prefetch_scan_error_in_queue() { + // Test scan errors (not open errors) with prefetch + let batches = FileStreamTest::new() + .with_records(vec![make_partition(2), make_partition(3)]) + .with_num_files(5) + .with_file_prefetch_depth(3) + .with_on_error(OnError::Skip) + .with_scan_errors(vec![1]) // File 1 fails during scanning + .result() + .await + .unwrap(); + + // Should get 8 batches (4 files * 2 batches each, skipping file 1) + assert_eq!(batches.len(), 8); + } + + #[tokio::test] + async fn test_prefetch_mixed_errors() { + // Test combination of open and scan errors with prefetch + let batches = FileStreamTest::new() + .with_records(vec![make_partition(2), make_partition(3)]) + .with_num_files(8) + .with_file_prefetch_depth(4) + .with_on_error(OnError::Skip) + .with_open_errors(vec![1, 5]) + .with_scan_errors(vec![3]) + .result() + .await + .unwrap(); + + // Should get 10 batches (5 files * 2 batches each, skipping 3 files) + assert_eq!(batches.len(), 10); + } + + #[tokio::test] + async fn test_prefetch_many_files() { + // Test with many files to ensure prefetch scales + let batches = FileStreamTest::new() + .with_records(vec![make_partition(1)]) + .with_num_files(50) + .with_file_prefetch_depth(8) + .result() + .await + .unwrap(); + + // Should get 50 batches (one from each file) + assert_eq!(batches.len(), 50); + } + + #[tokio::test] + async fn test_prefetch_many_files_with_errors() { + // Test many files with some errors scattered throughout + let batches = FileStreamTest::new() + .with_records(vec![make_partition(2), make_partition(1)]) + .with_num_files(50) + .with_file_prefetch_depth(7) + .with_on_error(OnError::Skip) + .with_open_errors(vec![5, 15, 25, 35, 45]) // 5 files fail + .result() + .await + .unwrap(); + + // Should get 90 batches (45 files * 2 batches each, skipping 5 files) + assert_eq!(batches.len(), 90); + } } diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 670992633bb8..34c0728d5ac7 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -219,6 +219,7 @@ datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics true datafusion.execution.enable_recursive_ctes true datafusion.execution.enforce_batch_size_in_joins false +datafusion.execution.file_prefetch_depth 1 datafusion.execution.keep_partition_by_columns false datafusion.execution.listing_table_factory_infer_partitions true datafusion.execution.listing_table_ignore_subdirectory true @@ -335,6 +336,7 @@ datafusion.execution.coalesce_batches true When set to true, record batches will datafusion.execution.collect_statistics true Should DataFusion collect statistics when first creating a table. Has no effect after the table is created. Applies to the default `ListingTableProvider` in DataFusion. Defaults to true. datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. +datafusion.execution.file_prefetch_depth 1 Number of files to prefetch (open concurrently) within a single FileStream partition. This controls the balance between I/O and CPU utilization. Higher values can improve I/O throughput by opening more files in parallel while processing current files, but use more memory and file handles. Setting to 1 maintains current behavior. datafusion.execution.keep_partition_by_columns false Should DataFusion keep the columns used for partition_by in the output RecordBatches datafusion.execution.listing_table_factory_infer_partitions true Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). datafusion.execution.listing_table_ignore_subdirectory true Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 2069055baafb..09edbb7772b1 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -115,6 +115,7 @@ The following configuration settings are available: | datafusion.execution.sort_spill_reservation_bytes | 10485760 | Specifies the reserved memory for each spillable sort operation to facilitate an in-memory merge. When a sort operation spills to disk, the in-memory data must be sorted and merged before being written to a file. This setting reserves a specific amount of memory for that in-memory sort/merge process. Note: This setting is irrelevant if the sort operation cannot spill (i.e., if there's no `DiskManager` configured). | | datafusion.execution.sort_in_place_threshold_bytes | 1048576 | When sorting, below what size should data be concatenated and sorted in a single RecordBatch rather than sorted in batches and merged. | | datafusion.execution.meta_fetch_concurrency | 32 | Number of files to read in parallel when inferring schema and statistics | +| datafusion.execution.file_prefetch_depth | 1 | Number of files to prefetch (open concurrently) within a single FileStream partition. This controls the balance between I/O and CPU utilization. Higher values can improve I/O throughput by opening more files in parallel while processing current files, but use more memory and file handles. Setting to 1 maintains current behavior. | | datafusion.execution.minimum_parallel_output_files | 4 | Guarantees a minimum level of output files running in parallel. RecordBatches will be distributed in round robin fashion to each parallel writer. Each writer is closed and a new file opened once soft_max_rows_per_output_file is reached. | | datafusion.execution.soft_max_rows_per_output_file | 50000000 | Target number of rows in output files when writing multiple. This is a soft max, so it can be exceeded slightly. There also will be one file smaller than the limit if the total number of rows written is not roughly divisible by the soft max | | datafusion.execution.max_buffered_batches_per_output_file | 2 | This is the maximum number of RecordBatches buffered for each output file being worked. Higher values can potentially give faster write performance at the cost of higher peak memory consumption |