diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index 38456944075fc..a01e4c5a1f72e 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -674,6 +674,7 @@ impl TableProvider for ListingTable { insert_op, keep_partition_by_columns, file_extension: self.options().format.get_ext(), + single_file_output: None, // Use extension heuristic for table inserts }; // For writes, we only use user-specified ordering (no file groups to derive from) diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 1e9f72501e4cc..96c57049fd35d 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -2048,11 +2048,17 @@ impl DataFrame { .build()? }; + // Build copy options, including single_file_output if explicitly set + let mut copy_options: HashMap = HashMap::new(); + if options.single_file_output { + copy_options.insert("single_file_output".to_string(), "true".to_string()); + } + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), file_type, - HashMap::new(), + copy_options, options.partition_by, )? .build()?; @@ -2116,11 +2122,17 @@ impl DataFrame { .build()? }; + // Build copy options, including single_file_output if explicitly set + let mut copy_options: HashMap = HashMap::new(); + if options.single_file_output { + copy_options.insert("single_file_output".to_string(), "true".to_string()); + } + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), file_type, - Default::default(), + copy_options, options.partition_by, )? .build()?; diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 6edf628e2d6d6..da2bddb623476 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::sync::Arc; use crate::datasource::file_format::{ @@ -84,11 +85,17 @@ impl DataFrame { .build()? }; + // Build copy options, including single_file_output if explicitly set + let mut copy_options = HashMap::::new(); + if options.single_file_output { + copy_options.insert("single_file_output".to_string(), "true".to_string()); + } + let plan = LogicalPlanBuilder::copy_to( plan, path.into(), file_type, - Default::default(), + copy_options, options.partition_by, )? .build()?; @@ -324,4 +331,52 @@ mod tests { Ok(()) } + + /// Test that single_file_output works for paths WITHOUT file extensions. + /// This verifies the fix for the regression where extension heuristics + /// ignored the explicit with_single_file_output(true) setting. + #[tokio::test] + async fn test_single_file_output_without_extension() -> Result<()> { + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + + let ctx = SessionContext::new(); + let tmp_dir = TempDir::new()?; + + // Path WITHOUT .parquet extension - this is the key scenario + let output_path = tmp_dir.path().join("data_no_ext"); + let output_path_str = output_path.to_str().unwrap(); + + let df = ctx.read_batch(RecordBatch::try_new( + Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])), + vec![Arc::new(Int32Array::from(vec![1, 2, 3]))], + )?)?; + + // Explicitly request single file output + df.write_parquet( + output_path_str, + DataFrameWriteOptions::new().with_single_file_output(true), + None, + ) + .await?; + + // Verify: output should be a FILE, not a directory + assert!( + output_path.is_file(), + "Expected single file at {:?}, but got is_file={}, is_dir={}", + output_path, + output_path.is_file(), + output_path.is_dir() + ); + + // Verify the file is readable as parquet + let file = std::fs::File::open(&output_path)?; + let reader = parquet::file::reader::SerializedFileReader::new(file)?; + let metadata = reader.metadata(); + assert_eq!(metadata.num_row_groups(), 1); + assert_eq!(metadata.file_metadata().num_rows(), 3); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 47ce519f01289..dd63440ff3359 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1547,6 +1547,7 @@ mod tests { insert_op: InsertOp::Overwrite, keep_partition_by_columns: false, file_extension: "parquet".into(), + single_file_output: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, @@ -1638,6 +1639,7 @@ mod tests { insert_op: InsertOp::Overwrite, keep_partition_by_columns: false, file_extension: "parquet".into(), + single_file_output: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, @@ -1728,6 +1730,7 @@ mod tests { insert_op: InsertOp::Overwrite, keep_partition_by_columns: false, file_extension: "parquet".into(), + single_file_output: None, }; let parquet_sink = Arc::new(ParquetSink::new( file_sink_config, diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 94c8fd510a382..c062166312920 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -549,8 +549,30 @@ impl DefaultPhysicalPlanner { } }; + // Parse single_file_output option if explicitly set + let single_file_output = match source_option_tuples + .get("single_file_output") + .map(|v| v.trim()) + { + None => None, + Some("true") => Some(true), + Some("false") => Some(false), + Some(value) => { + return Err(DataFusionError::Configuration(format!( + "provided value for 'single_file_output' was not recognized: \"{value}\"" + ))); + } + }; + + // Filter out sink-related options that are not format options + let format_options: HashMap = source_option_tuples + .iter() + .filter(|(k, _)| k.as_str() != "single_file_output") + .map(|(k, v)| (k.clone(), v.clone())) + .collect(); + let sink_format = file_type_to_format(file_type)? - .create(session_state, source_option_tuples)?; + .create(session_state, &format_options)?; // Determine extension based on format extension and compression let file_extension = match sink_format.compression_type() { @@ -571,6 +593,7 @@ impl DefaultPhysicalPlanner { insert_op: InsertOp::Append, keep_partition_by_columns, file_extension, + single_file_output, }; let ordering = input_exec.properties().output_ordering().cloned(); diff --git a/datafusion/datasource/src/file_sink_config.rs b/datafusion/datasource/src/file_sink_config.rs index 643831a1199f8..8c5bc560780f6 100644 --- a/datafusion/datasource/src/file_sink_config.rs +++ b/datafusion/datasource/src/file_sink_config.rs @@ -112,6 +112,11 @@ pub struct FileSinkConfig { pub keep_partition_by_columns: bool, /// File extension without a dot(.) pub file_extension: String, + /// Override for single file output behavior. + /// - `None`: use extension heuristic (path with extension = single file) + /// - `Some(true)`: force single file output at exact path + /// - `Some(false)`: force directory output with generated filenames + pub single_file_output: Option, } impl FileSinkConfig { diff --git a/datafusion/datasource/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs index bec5b8b0bff0e..921c1f3b41b55 100644 --- a/datafusion/datasource/src/write/demux.rs +++ b/datafusion/datasource/src/write/demux.rs @@ -106,8 +106,11 @@ pub(crate) fn start_demuxer_task( let file_extension = config.file_extension.clone(); let base_output_path = config.table_paths[0].clone(); let task = if config.table_partition_cols.is_empty() { - let single_file_output = !base_output_path.is_collection() - && base_output_path.file_extension().is_some(); + // Use explicit single_file_output if set, otherwise fall back to extension heuristic + let single_file_output = config.single_file_output.unwrap_or_else(|| { + !base_output_path.is_collection() + && base_output_path.file_extension().is_some() + }); SpawnedTask::spawn(async move { row_count_demuxer( tx, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 3cfc796700dae..12b683cb15244 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -737,6 +737,8 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig { insert_op, keep_partition_by_columns: conf.keep_partition_by_columns, file_extension: conf.file_extension.clone(), + // For deserialized plans, use extension heuristic (backward compatible) + single_file_output: None, }) } } diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index b54b7030fc52a..33f7ec6b881e8 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -1475,6 +1475,7 @@ fn roundtrip_json_sink() -> Result<()> { insert_op: InsertOp::Overwrite, keep_partition_by_columns: true, file_extension: "json".into(), + single_file_output: None, }; let data_sink = Arc::new(JsonSink::new( file_sink_config, @@ -1513,6 +1514,7 @@ fn roundtrip_csv_sink() -> Result<()> { insert_op: InsertOp::Overwrite, keep_partition_by_columns: true, file_extension: "csv".into(), + single_file_output: None, }; let data_sink = Arc::new(CsvSink::new( file_sink_config, @@ -1570,6 +1572,7 @@ fn roundtrip_parquet_sink() -> Result<()> { insert_op: InsertOp::Overwrite, keep_partition_by_columns: true, file_extension: "parquet".into(), + single_file_output: None, }; let data_sink = Arc::new(ParquetSink::new( file_sink_config,