diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 0d060db3bf147..fe760760eef3f 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -107,6 +107,8 @@ impl DataFrameWriteOptions { } /// Set the single_file_output value to true or false + /// + /// When set to true, an output file will always be created even if the DataFrame is empty pub fn with_single_file_output(mut self, single_file_output: bool) -> Self { self.single_file_output = single_file_output; self diff --git a/datafusion/core/src/datasource/file_format/arrow.rs b/datafusion/core/src/datasource/file_format/arrow.rs index 8701f96eb3b84..338de76b1353b 100644 --- a/datafusion/core/src/datasource/file_format/arrow.rs +++ b/datafusion/core/src/datasource/file_format/arrow.rs @@ -17,3 +17,96 @@ //! Re-exports the [`datafusion_datasource_arrow::file_format`] module, and contains tests for it. pub use datafusion_datasource_arrow::file_format::*; + +#[cfg(test)] +mod tests { + use futures::StreamExt; + use std::sync::Arc; + + use arrow::array::{Int64Array, StringArray}; + use arrow::datatypes::{DataType, Field, Schema}; + use arrow::record_batch::RecordBatch; + use datafusion_common::Result; + + use crate::execution::options::ArrowReadOptions; + use crate::prelude::SessionContext; + + #[tokio::test] + async fn test_write_empty_arrow_from_sql() -> Result<()> { + let ctx = SessionContext::new(); + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/empty_sql.arrow", tmp_dir.path().to_string_lossy()); + + ctx.sql(&format!( + "COPY (SELECT CAST(1 AS BIGINT) AS id LIMIT 0) TO '{path}' STORED AS ARROW", + )) + .await? + .collect() + .await?; + + assert!(std::path::Path::new(&path).exists()); + + let read_df = ctx.read_arrow(&path, ArrowReadOptions::default()).await?; + let stream = read_df.execute_stream().await?; + + assert_eq!(stream.schema().fields().len(), 1); + assert_eq!(stream.schema().field(0).name(), "id"); + + let results: Vec<_> = stream.collect().await; + let total_rows: usize = results + .iter() + .filter_map(|r| r.as_ref().ok()) + .map(|b| b.num_rows()) + .sum(); + assert_eq!(total_rows, 0); + + Ok(()) + } + + #[tokio::test] + async fn test_write_empty_arrow_from_record_batch() -> Result<()> { + let ctx = SessionContext::new(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let empty_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int64Array::from(Vec::::new())), + Arc::new(StringArray::from(Vec::>::new())), + ], + )?; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/empty_batch.arrow", tmp_dir.path().to_string_lossy()); + + ctx.register_batch("empty_table", empty_batch)?; + + ctx.sql(&format!("COPY empty_table TO '{path}' STORED AS ARROW")) + .await? + .collect() + .await?; + + assert!(std::path::Path::new(&path).exists()); + + let read_df = ctx.read_arrow(&path, ArrowReadOptions::default()).await?; + let stream = read_df.execute_stream().await?; + + assert_eq!(stream.schema().fields().len(), 2); + assert_eq!(stream.schema().field(0).name(), "id"); + assert_eq!(stream.schema().field(1).name(), "name"); + + let results: Vec<_> = stream.collect().await; + let total_rows: usize = results + .iter() + .filter_map(|r| r.as_ref().ok()) + .map(|b| b.num_rows()) + .sum(); + assert_eq!(total_rows, 0); + + Ok(()) + } +} diff --git a/datafusion/core/src/datasource/file_format/csv.rs b/datafusion/core/src/datasource/file_format/csv.rs index 719bc4361ac91..aa226144a4af1 100644 --- a/datafusion/core/src/datasource/file_format/csv.rs +++ b/datafusion/core/src/datasource/file_format/csv.rs @@ -1537,6 +1537,68 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_write_empty_csv_from_sql() -> Result<()> { + let ctx = SessionContext::new(); + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/empty_sql.csv", tmp_dir.path().to_string_lossy()); + let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?; + df.write_csv(&path, crate::dataframe::DataFrameWriteOptions::new(), None) + .await?; + assert!(std::path::Path::new(&path).exists()); + + let read_df = ctx + .read_csv(&path, CsvReadOptions::default().has_header(true)) + .await?; + let stream = read_df.execute_stream().await?; + assert_eq!(stream.schema().fields().len(), 1); + assert_eq!(stream.schema().field(0).name(), "id"); + + let results: Vec<_> = stream.collect().await; + assert_eq!(results.len(), 0); + + Ok(()) + } + + #[tokio::test] + async fn test_write_empty_csv_from_record_batch() -> Result<()> { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let empty_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::Int64Array::from(Vec::::new())), + Arc::new(StringArray::from(Vec::>::new())), + ], + )?; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/empty_batch.csv", tmp_dir.path().to_string_lossy()); + + // Write empty RecordBatch + let df = ctx.read_batch(empty_batch.clone())?; + df.write_csv(&path, crate::dataframe::DataFrameWriteOptions::new(), None) + .await?; + // Expected the file to exist + assert!(std::path::Path::new(&path).exists()); + + let read_df = ctx + .read_csv(&path, CsvReadOptions::default().has_header(true)) + .await?; + let stream = read_df.execute_stream().await?; + assert_eq!(stream.schema().fields().len(), 2); + assert_eq!(stream.schema().field(0).name(), "id"); + assert_eq!(stream.schema().field(1).name(), "name"); + + let results: Vec<_> = stream.collect().await; + assert_eq!(results.len(), 0); + + Ok(()) + } + #[tokio::test] async fn test_infer_schema_with_zero_max_records() -> Result<()> { let session_ctx = SessionContext::new(); diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 4d5ed34399693..cb2e9d787ee92 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -349,4 +349,46 @@ mod tests { fn fmt_batches(batches: &[RecordBatch]) -> String { pretty::pretty_format_batches(batches).unwrap().to_string() } + + #[tokio::test] + async fn test_write_empty_json_from_sql() -> Result<()> { + let ctx = SessionContext::new(); + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/empty_sql.json", tmp_dir.path().to_string_lossy()); + let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?; + df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) + .await?; + // Expected the file to exist and be empty + assert!(std::path::Path::new(&path).exists()); + let metadata = std::fs::metadata(&path)?; + assert_eq!(metadata.len(), 0); + Ok(()) + } + + #[tokio::test] + async fn test_write_empty_json_from_record_batch() -> Result<()> { + let ctx = SessionContext::new(); + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, true), + ])); + let empty_batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::Int64Array::from(Vec::::new())), + Arc::new(arrow::array::StringArray::from(Vec::>::new())), + ], + )?; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/empty_batch.json", tmp_dir.path().to_string_lossy()); + let df = ctx.read_batch(empty_batch.clone())?; + df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) + .await?; + // Expected the file to exist and be empty + assert!(std::path::Path::new(&path).exists()); + let metadata = std::fs::metadata(&path)?; + assert_eq!(metadata.len(), 0); + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 44cf09c1ae46e..47ce519f01289 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -1366,6 +1366,28 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_write_empty_parquet_from_sql() -> Result<()> { + let ctx = SessionContext::new(); + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/empty_sql.parquet", tmp_dir.path().to_string_lossy()); + let df = ctx.sql("SELECT CAST(1 AS INT) AS id LIMIT 0").await?; + df.write_parquet(&path, crate::dataframe::DataFrameWriteOptions::new(), None) + .await?; + // Expected the file to exist + assert!(std::path::Path::new(&path).exists()); + let read_df = ctx.read_parquet(&path, ParquetReadOptions::new()).await?; + let stream = read_df.execute_stream().await?; + assert_eq!(stream.schema().fields().len(), 1); + assert_eq!(stream.schema().field(0).name(), "id"); + + let results: Vec<_> = stream.collect().await; + assert_eq!(results.len(), 0); + + Ok(()) + } + #[tokio::test] async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> { // expected kv metadata without schema diff --git a/datafusion/datasource/src/write/demux.rs b/datafusion/datasource/src/write/demux.rs index 5e4962aa48b18..bec5b8b0bff0e 100644 --- a/datafusion/datasource/src/write/demux.rs +++ b/datafusion/datasource/src/write/demux.rs @@ -191,7 +191,11 @@ async fn row_count_demuxer( part_idx += 1; } + let schema = input.schema(); + let mut is_batch_received = false; + while let Some(rb) = input.next().await.transpose()? { + is_batch_received = true; // ensure we have at least minimum_parallel_files open if open_file_streams.len() < minimum_parallel_files { open_file_streams.push(create_new_file_stream( @@ -228,6 +232,19 @@ async fn row_count_demuxer( next_send_steam = (next_send_steam + 1) % minimum_parallel_files; } + + // if there is no batch send but with a single file, send an empty batch + if single_file_output && !is_batch_received { + open_file_streams + .first_mut() + .ok_or_else(|| internal_datafusion_err!("Expected a single output file"))? + .send(RecordBatch::new_empty(schema)) + .await + .map_err(|_| { + exec_datafusion_err!("Error sending empty RecordBatch to file stream!") + })?; + } + Ok(()) }