Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ impl DataFrameWriteOptions {
}

/// Set the single_file_output value to true or false
///
/// When set to true, an output file will always be created even if the DataFrame is empty
pub fn with_single_file_output(mut self, single_file_output: bool) -> Self {
self.single_file_output = single_file_output;
self
Expand Down
93 changes: 93 additions & 0 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,96 @@

//! Re-exports the [`datafusion_datasource_arrow::file_format`] module, and contains tests for it.
pub use datafusion_datasource_arrow::file_format::*;

#[cfg(test)]
mod tests {
use futures::StreamExt;
use std::sync::Arc;

use arrow::array::{Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;

use crate::execution::options::ArrowReadOptions;
use crate::prelude::SessionContext;

#[tokio::test]
async fn test_write_empty_arrow_from_sql() -> Result<()> {
let ctx = SessionContext::new();

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/empty_sql.arrow", tmp_dir.path().to_string_lossy());

ctx.sql(&format!(
"COPY (SELECT CAST(1 AS BIGINT) AS id LIMIT 0) TO '{path}' STORED AS ARROW",
))
.await?
.collect()
.await?;

assert!(std::path::Path::new(&path).exists());

let read_df = ctx.read_arrow(&path, ArrowReadOptions::default()).await?;
let stream = read_df.execute_stream().await?;

assert_eq!(stream.schema().fields().len(), 1);
assert_eq!(stream.schema().field(0).name(), "id");

let results: Vec<_> = stream.collect().await;
let total_rows: usize = results
.iter()
.filter_map(|r| r.as_ref().ok())
.map(|b| b.num_rows())
.sum();
assert_eq!(total_rows, 0);

Ok(())
}

#[tokio::test]
async fn test_write_empty_arrow_from_record_batch() -> Result<()> {
let ctx = SessionContext::new();

let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
]));
let empty_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(Int64Array::from(Vec::<i64>::new())),
Arc::new(StringArray::from(Vec::<Option<&str>>::new())),
],
)?;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/empty_batch.arrow", tmp_dir.path().to_string_lossy());

ctx.register_batch("empty_table", empty_batch)?;

ctx.sql(&format!("COPY empty_table TO '{path}' STORED AS ARROW"))
.await?
.collect()
.await?;

assert!(std::path::Path::new(&path).exists());

let read_df = ctx.read_arrow(&path, ArrowReadOptions::default()).await?;
let stream = read_df.execute_stream().await?;

assert_eq!(stream.schema().fields().len(), 2);
assert_eq!(stream.schema().field(0).name(), "id");
assert_eq!(stream.schema().field(1).name(), "name");

let results: Vec<_> = stream.collect().await;
let total_rows: usize = results
.iter()
.filter_map(|r| r.as_ref().ok())
.map(|b| b.num_rows())
.sum();
assert_eq!(total_rows, 0);

Ok(())
}
}
62 changes: 62 additions & 0 deletions datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1537,6 +1537,68 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_write_empty_csv_from_sql() -> Result<()> {
let ctx = SessionContext::new();
let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/empty_sql.csv", tmp_dir.path().to_string_lossy());
let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
df.write_csv(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
assert!(std::path::Path::new(&path).exists());

let read_df = ctx
.read_csv(&path, CsvReadOptions::default().has_header(true))
.await?;
let stream = read_df.execute_stream().await?;
assert_eq!(stream.schema().fields().len(), 1);
assert_eq!(stream.schema().field(0).name(), "id");

let results: Vec<_> = stream.collect().await;
assert_eq!(results.len(), 0);

Ok(())
}

#[tokio::test]
async fn test_write_empty_csv_from_record_batch() -> Result<()> {
let ctx = SessionContext::new();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
]));
let empty_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::Int64Array::from(Vec::<i64>::new())),
Arc::new(StringArray::from(Vec::<Option<&str>>::new())),
],
)?;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/empty_batch.csv", tmp_dir.path().to_string_lossy());

// Write empty RecordBatch
let df = ctx.read_batch(empty_batch.clone())?;
df.write_csv(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist
assert!(std::path::Path::new(&path).exists());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why there is no assertion of 0 lines here like you did for arrow, parquet files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed it. Added assertions now to verify schema and 0 rows for CSV


let read_df = ctx
.read_csv(&path, CsvReadOptions::default().has_header(true))
.await?;
let stream = read_df.execute_stream().await?;
assert_eq!(stream.schema().fields().len(), 2);
assert_eq!(stream.schema().field(0).name(), "id");
assert_eq!(stream.schema().field(1).name(), "name");

let results: Vec<_> = stream.collect().await;
assert_eq!(results.len(), 0);

Ok(())
}

#[tokio::test]
async fn test_infer_schema_with_zero_max_records() -> Result<()> {
let session_ctx = SessionContext::new();
Expand Down
42 changes: 42 additions & 0 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,4 +349,46 @@ mod tests {
fn fmt_batches(batches: &[RecordBatch]) -> String {
pretty::pretty_format_batches(batches).unwrap().to_string()
}

#[tokio::test]
async fn test_write_empty_json_from_sql() -> Result<()> {
let ctx = SessionContext::new();
let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/empty_sql.json", tmp_dir.path().to_string_lossy());
let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist and be empty
assert!(std::path::Path::new(&path).exists());
let metadata = std::fs::metadata(&path)?;
assert_eq!(metadata.len(), 0);
Ok(())
}

#[tokio::test]
async fn test_write_empty_json_from_record_batch() -> Result<()> {
let ctx = SessionContext::new();
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, true),
]));
let empty_batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(arrow::array::Int64Array::from(Vec::<i64>::new())),
Arc::new(arrow::array::StringArray::from(Vec::<Option<&str>>::new())),
],
)?;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/empty_batch.json", tmp_dir.path().to_string_lossy());
let df = ctx.read_batch(empty_batch.clone())?;
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist and be empty
assert!(std::path::Path::new(&path).exists());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious why there is no assertion of 0 lines here like you did for arrow, parquet files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For JSON, empty files can't be read back to validate schema, Add file check

let metadata = std::fs::metadata(&path)?;
assert_eq!(metadata.len(), 0);
Ok(())
}
}
22 changes: 22 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1366,6 +1366,28 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_write_empty_parquet_from_sql() -> Result<()> {
let ctx = SessionContext::new();

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/empty_sql.parquet", tmp_dir.path().to_string_lossy());
let df = ctx.sql("SELECT CAST(1 AS INT) AS id LIMIT 0").await?;
df.write_parquet(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist
assert!(std::path::Path::new(&path).exists());
let read_df = ctx.read_parquet(&path, ParquetReadOptions::new()).await?;
let stream = read_df.execute_stream().await?;
assert_eq!(stream.schema().fields().len(), 1);
assert_eq!(stream.schema().field(0).name(), "id");

let results: Vec<_> = stream.collect().await;
assert_eq!(results.len(), 0);

Ok(())
}

#[tokio::test]
async fn parquet_sink_write_insert_schema_into_metadata() -> Result<()> {
// expected kv metadata without schema
Expand Down
17 changes: 17 additions & 0 deletions datafusion/datasource/src/write/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,11 @@ async fn row_count_demuxer(
part_idx += 1;
}

let schema = input.schema();
let mut is_batch_received = false;

while let Some(rb) = input.next().await.transpose()? {
is_batch_received = true;
// ensure we have at least minimum_parallel_files open
if open_file_streams.len() < minimum_parallel_files {
open_file_streams.push(create_new_file_stream(
Expand Down Expand Up @@ -228,6 +232,19 @@ async fn row_count_demuxer(

next_send_steam = (next_send_steam + 1) % minimum_parallel_files;
}

// if there is no batch send but with a single file, send an empty batch
if single_file_output && !is_batch_received {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DataFrameWriteOptions::with_single_file_output() method should also be updated about empty DataFrame behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Updated the doc comment for with_single_file_output()

open_file_streams
.first_mut()
.ok_or_else(|| internal_datafusion_err!("Expected a single output file"))?
.send(RecordBatch::new_empty(schema))
.await
.map_err(|_| {
exec_datafusion_err!("Error sending empty RecordBatch to file stream!")
})?;
}

Ok(())
}

Expand Down