From 590f97e1f10cd5c3dc875c9c9614c2eb1859a5af Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Thu, 22 Jan 2026 11:54:30 +0800 Subject: [PATCH 1/3] array json support --- Cargo.lock | 1 + .../custom_data_source/csv_json_opener.rs | 1 + datafusion/common/src/config.rs | 16 + .../core/src/datasource/file_format/json.rs | 273 ++++++++++++++++++ .../src/datasource/file_format/options.rs | 14 +- datafusion/core/tests/data/json_array.json | 5 + .../core/tests/data/json_empty_array.json | 1 + datafusion/datasource-json/Cargo.toml | 1 + datafusion/datasource-json/src/file_format.rs | 95 +++++- datafusion/datasource-json/src/source.rs | 122 ++++++-- .../proto/datafusion_common.proto | 1 + datafusion/proto-common/src/from_proto/mod.rs | 1 + .../proto-common/src/generated/pbjson.rs | 18 ++ .../proto-common/src/generated/prost.rs | 3 + datafusion/proto-common/src/to_proto/mod.rs | 1 + .../src/generated/datafusion_proto_common.rs | 3 + .../proto/src/logical_plan/file_formats.rs | 2 + datafusion/sqllogictest/test_files/json.slt | 28 ++ 18 files changed, 557 insertions(+), 29 deletions(-) create mode 100644 datafusion/core/tests/data/json_array.json create mode 100644 datafusion/core/tests/data/json_empty_array.json diff --git a/Cargo.lock b/Cargo.lock index 3dc276d7c2310..fa27e6977c624 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2033,6 +2033,7 @@ dependencies = [ "datafusion-session", "futures", "object_store", + "serde_json", "tokio", ] diff --git a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs index 347f1a0464716..27acf508563ba 100644 --- a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs +++ b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs @@ -125,6 +125,7 @@ async fn json_opener() -> Result<()> { projected, FileCompressionType::UNCOMPRESSED, Arc::new(object_store), + false, ); let scan_config = FileScanConfigBuilder::new( diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 87344914d2f7e..8f2a07a83385c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -3065,6 +3065,22 @@ config_namespace! { /// If not specified, the default level for the compression algorithm is used. pub compression_level: Option, default = None pub schema_infer_max_rec: Option, default = None + /// The format of JSON input files. + /// + /// When `false` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `true`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub format_array: bool, default = false } } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index cb2e9d787ee92..35b3acb7a603d 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -47,6 +47,7 @@ mod tests { use datafusion_common::stats::Precision; use datafusion_common::Result; + use datafusion_datasource::file_compression_type::FileCompressionType; use futures::StreamExt; use insta::assert_snapshot; use object_store::local::LocalFileSystem; @@ -391,4 +392,276 @@ mod tests { assert_eq!(metadata.len(), 0); Ok(()) } + + #[tokio::test] + async fn test_json_array_format() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let store = Arc::new(LocalFileSystem::new()) as _; + + // Create a temporary file with JSON array format + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"a": 1, "b": 2.0, "c": true}, + {"a": 2, "b": 3.5, "c": false}, + {"a": 3, "b": 4.0, "c": true} + ]"#, + )?; + + // Test with format_array = true + let format = JsonFormat::default().with_format_array(true); + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await + .expect("Schema inference"); + + let fields = file_schema + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect::>(); + assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_empty() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let store = Arc::new(LocalFileSystem::new()) as _; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/empty_array.json", tmp_dir.path().to_string_lossy()); + std::fs::write(&path, "[]")?; + + let format = JsonFormat::default().with_format_array(true); + let result = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await; + + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("JSON array is empty") + ); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_with_limit() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let store = Arc::new(LocalFileSystem::new()) as _; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array_limit.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"a": 1}, + {"a": 2, "b": "extra"} + ]"#, + )?; + + // Only infer from first record + let format = JsonFormat::default() + .with_format_array(true) + .with_schema_infer_max_rec(1); + + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await + .expect("Schema inference"); + + // Should only have field "a" since we limited to 1 record + let fields = file_schema + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect::>(); + assert_eq!(vec!["a: Int64"], fields); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_read_data() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let task_ctx = ctx.task_ctx(); + let store = Arc::new(LocalFileSystem::new()) as _; + + // Create a temporary file with JSON array format + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"a": 1, "b": 2.0, "c": true}, + {"a": 2, "b": 3.5, "c": false}, + {"a": 3, "b": 4.0, "c": true} + ]"#, + )?; + + let format = JsonFormat::default().with_format_array(true); + + // Infer schema + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await?; + + // Scan and read data + let exec = scan_format( + &ctx, + &format, + Some(file_schema), + tmp_dir.path().to_str().unwrap(), + "array.json", + None, + None, + ) + .await?; + let batches = collect(exec, task_ctx).await?; + + assert_eq!(1, batches.len()); + assert_eq!(3, batches[0].num_columns()); + assert_eq!(3, batches[0].num_rows()); + + // Verify data + let array_a = as_int64_array(batches[0].column(0))?; + assert_eq!( + vec![1, 2, 3], + (0..3).map(|i| array_a.value(i)).collect::>() + ); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_with_projection() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let task_ctx = ctx.task_ctx(); + let store = Arc::new(LocalFileSystem::new()) as _; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); + std::fs::write(&path, r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#)?; + + let format = JsonFormat::default().with_format_array(true); + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await?; + + // Project only column "a" + let exec = scan_format( + &ctx, + &format, + Some(file_schema), + tmp_dir.path().to_str().unwrap(), + "array.json", + Some(vec![0]), + None, + ) + .await?; + let batches = collect(exec, task_ctx).await?; + + assert_eq!(1, batches.len()); + assert_eq!(1, batches[0].num_columns()); // Only 1 column projected + assert_eq!(2, batches[0].num_rows()); + + Ok(()) + } + + #[tokio::test] + async fn test_ndjson_read_options_format_array() -> Result<()> { + let ctx = SessionContext::new(); + + // Create a temporary file with JSON array format + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"a": 1, "b": "hello"}, + {"a": 2, "b": "world"}, + {"a": 3, "b": "test"} + ]"#, + )?; + + // Use NdJsonReadOptions with format_array = true + let options = NdJsonReadOptions::default().format_array(true); + + ctx.register_json("json_array_table", &path, options) + .await?; + + let result = ctx + .sql("SELECT a, b FROM json_array_table ORDER BY a") + .await? + .collect() + .await?; + + assert_snapshot!(batches_to_string(&result), @r" + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + | 3 | test | + +---+-------+ + "); + + Ok(()) + } + + #[tokio::test] + async fn test_ndjson_read_options_format_array_with_compression() -> Result<()> { + use flate2::Compression; + use flate2::write::GzEncoder; + use std::io::Write; + + let ctx = SessionContext::new(); + + // Create a temporary gzip compressed JSON array file + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy()); + + let json_content = r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#; + let file = std::fs::File::create(&path)?; + let mut encoder = GzEncoder::new(file, Compression::default()); + encoder.write_all(json_content.as_bytes())?; + encoder.finish()?; + + // Use NdJsonReadOptions with format_array and GZIP compression + let options = NdJsonReadOptions::default() + .format_array(true) + .file_compression_type(FileCompressionType::GZIP) + .file_extension(".json.gz"); + + ctx.register_json("json_array_gzip", &path, options).await?; + + let result = ctx + .sql("SELECT a, b FROM json_array_gzip ORDER BY a") + .await? + .collect() + .await?; + + assert_snapshot!(batches_to_string(&result), @r" + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + +---+-------+ + "); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 146c5f6f5fd0f..8fa9ef28d78bc 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -465,6 +465,9 @@ pub struct NdJsonReadOptions<'a> { pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, + /// Whether the JSON file is in array format `[{...}, {...}]` instead of + /// line-delimited format. Defaults to `false`. + pub format_array: bool, } impl Default for NdJsonReadOptions<'_> { @@ -477,6 +480,7 @@ impl Default for NdJsonReadOptions<'_> { file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, file_sort_order: vec![], + format_array: false, } } } @@ -529,6 +533,13 @@ impl<'a> NdJsonReadOptions<'a> { self.schema_infer_max_records = schema_infer_max_records; self } + + /// Specify whether the JSON file is in array format `[{...}, {...}]` + /// instead of line-delimited format. + pub fn format_array(mut self, format_array: bool) -> Self { + self.format_array = format_array; + self + } } #[async_trait] @@ -663,7 +674,8 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { let file_format = JsonFormat::default() .with_options(table_options.json) .with_schema_infer_max_rec(self.schema_infer_max_records) - .with_file_compression_type(self.file_compression_type.to_owned()); + .with_file_compression_type(self.file_compression_type.to_owned()) + .with_format_array(self.format_array); ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) diff --git a/datafusion/core/tests/data/json_array.json b/datafusion/core/tests/data/json_array.json new file mode 100644 index 0000000000000..1a8716dbf4beb --- /dev/null +++ b/datafusion/core/tests/data/json_array.json @@ -0,0 +1,5 @@ +[ + {"a": 1, "b": "hello"}, + {"a": 2, "b": "world"}, + {"a": 3, "b": "test"} +] diff --git a/datafusion/core/tests/data/json_empty_array.json b/datafusion/core/tests/data/json_empty_array.json new file mode 100644 index 0000000000000..fe51488c7066f --- /dev/null +++ b/datafusion/core/tests/data/json_empty_array.json @@ -0,0 +1 @@ +[] diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index 37fa8d43a0816..168ae8880eee7 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -44,6 +44,7 @@ datafusion-physical-plan = { workspace = true } datafusion-session = { workspace = true } futures = { workspace = true } object_store = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true } # Note: add additional linter rules in lib.rs. diff --git a/datafusion/datasource-json/src/file_format.rs b/datafusion/datasource-json/src/file_format.rs index a14458b5acd36..f6b258eb7c078 100644 --- a/datafusion/datasource-json/src/file_format.rs +++ b/datafusion/datasource-json/src/file_format.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions +//! [`JsonFormat`]: Line delimited and array JSON [`FileFormat`] abstractions use std::any::Any; use std::collections::HashMap; use std::fmt; use std::fmt::Debug; -use std::io::BufReader; +use std::io::{BufReader, Read}; use std::sync::Arc; use crate::source::JsonSource; @@ -48,6 +48,7 @@ use datafusion_datasource::file_format::{ use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; +use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::write::BatchSerializer; use datafusion_datasource::write::demux::DemuxedStreamReceiver; use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join; @@ -59,7 +60,6 @@ use datafusion_session::Session; use async_trait::async_trait; use bytes::{Buf, Bytes}; -use datafusion_datasource::source::DataSourceExec; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; #[derive(Default)] @@ -132,7 +132,23 @@ impl Debug for JsonFormatFactory { } } -/// New line delimited JSON `FileFormat` implementation. +/// JSON `FileFormat` implementation supporting both line-delimited and array formats. +/// +/// # Supported Formats +/// +/// ## Line-Delimited JSON (default) +/// ```text +/// {"key1": 1, "key2": "val"} +/// {"key1": 2, "key2": "vals"} +/// ``` +/// +/// ## JSON Array Format (when `format_array` option is true) +/// ```text +/// [ +/// {"key1": 1, "key2": "val"}, +/// {"key1": 2, "key2": "vals"} +/// ] +/// ``` #[derive(Debug, Default)] pub struct JsonFormat { options: JsonOptions, @@ -166,6 +182,49 @@ impl JsonFormat { self.options.compression = file_compression_type.into(); self } + + /// Set whether to expect JSON array format instead of line-delimited format. + /// + /// When `true`, expects input like: `[{"a": 1}, {"a": 2}]` + /// When `false` (default), expects input like: + /// ```text + /// {"a": 1} + /// {"a": 2} + /// ``` + pub fn with_format_array(mut self, format_array: bool) -> Self { + self.options.format_array = format_array; + self + } +} + +/// Infer schema from a JSON array format file. +/// +/// This function reads JSON data in array format `[{...}, {...}]` and infers +/// the Arrow schema from the contained objects. +fn infer_json_schema_from_json_array( + reader: &mut R, + max_records: usize, +) -> std::result::Result { + let mut content = String::new(); + reader.read_to_string(&mut content).map_err(|e| { + ArrowError::JsonError(format!("Failed to read JSON content: {e}")) + })?; + + // Parse as JSON array using serde_json + let values: Vec = serde_json::from_str(&content) + .map_err(|e| ArrowError::JsonError(format!("Failed to parse JSON array: {e}")))?; + + // Take only max_records for schema inference + let values_to_infer: Vec<_> = values.into_iter().take(max_records).collect(); + + if values_to_infer.is_empty() { + return Err(ArrowError::JsonError( + "JSON array is empty, cannot infer schema".to_string(), + )); + } + + // Use arrow's schema inference on the parsed values + infer_json_schema_from_iterator(values_to_infer.into_iter().map(Ok)) } #[async_trait] @@ -202,6 +261,8 @@ impl FileFormat for JsonFormat { .schema_infer_max_rec .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD); let file_compression_type = FileCompressionType::from(self.options.compression); + let is_array_format = self.options.format_array; + for object in objects { let mut take_while = || { let should_take = records_to_read > 0; @@ -217,15 +278,29 @@ impl FileFormat for JsonFormat { GetResultPayload::File(file, _) => { let decoder = file_compression_type.convert_read(file)?; let mut reader = BufReader::new(decoder); - let iter = ValueIter::new(&mut reader, None); - infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? + + if is_array_format { + infer_json_schema_from_json_array(&mut reader, records_to_read)? + } else { + let iter = ValueIter::new(&mut reader, None); + infer_json_schema_from_iterator( + iter.take_while(|_| take_while()), + )? + } } GetResultPayload::Stream(_) => { let data = r.bytes().await?; let decoder = file_compression_type.convert_read(data.reader())?; let mut reader = BufReader::new(decoder); - let iter = ValueIter::new(&mut reader, None); - infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? + + if is_array_format { + infer_json_schema_from_json_array(&mut reader, records_to_read)? + } else { + let iter = ValueIter::new(&mut reader, None); + infer_json_schema_from_iterator( + iter.take_while(|_| take_while()), + )? + } } }; @@ -281,7 +356,9 @@ impl FileFormat for JsonFormat { } fn file_source(&self, table_schema: TableSchema) -> Arc { - Arc::new(JsonSource::new(table_schema)) + Arc::new( + JsonSource::new(table_schema).with_format_array(self.options.format_array), + ) } } diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 5797054f11b9c..429e4a189aed4 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -36,6 +36,7 @@ use datafusion_datasource::{ use datafusion_physical_plan::projection::ProjectionExprs; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use arrow::array::RecordBatch; use arrow::json::ReaderBuilder; use arrow::{datatypes::SchemaRef, json}; use datafusion_datasource::file::FileSource; @@ -54,6 +55,7 @@ pub struct JsonOpener { projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, + format_array: bool, } impl JsonOpener { @@ -63,12 +65,14 @@ impl JsonOpener { projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, + format_array: bool, ) -> Self { Self { batch_size, projected_schema, file_compression_type, object_store, + format_array, } } } @@ -80,6 +84,7 @@ pub struct JsonSource { batch_size: Option, metrics: ExecutionPlanMetricsSet, projection: SplitProjection, + format_array: bool, } impl JsonSource { @@ -91,8 +96,15 @@ impl JsonSource { table_schema, batch_size: None, metrics: ExecutionPlanMetricsSet::new(), + format_array: false, } } + + /// Set whether to expect JSON array format + pub fn with_format_array(mut self, format_array: bool) -> Self { + self.format_array = format_array; + self + } } impl From for Arc { @@ -120,6 +132,7 @@ impl FileSource for JsonSource { projected_schema, file_compression_type: base_config.file_compression_type, object_store, + format_array: self.format_array, }) as Arc; // Wrap with ProjectionOpener @@ -186,6 +199,16 @@ impl FileOpener for JsonOpener { let schema = Arc::clone(&self.projected_schema); let batch_size = self.batch_size; let file_compression_type = self.file_compression_type.to_owned(); + let format_array = self.format_array; + + // JSON array format requires reading the complete file + if format_array && partitioned_file.range.is_some() { + return Err(DataFusionError::NotImplemented( + "JSON array format does not support range-based file scanning. \ + Disable repartition_file_scans or use line-delimited JSON format." + .to_string(), + )); + } Ok(Box::pin(async move { let calculated_range = @@ -222,33 +245,94 @@ impl FileOpener for JsonOpener { } }; - let reader = ReaderBuilder::new(schema) - .with_batch_size(batch_size) - .build(BufReader::new(bytes))?; - - Ok(futures::stream::iter(reader) - .map(|r| r.map_err(Into::into)) - .boxed()) + if format_array { + // Handle JSON array format + let batches = read_json_array_to_batches( + BufReader::new(bytes), + schema, + batch_size, + )?; + Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed()) + } else { + let reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(BufReader::new(bytes))?; + Ok(futures::stream::iter(reader) + .map(|r| r.map_err(Into::into)) + .boxed()) + } } GetResultPayload::Stream(s) => { - let s = s.map_err(DataFusionError::from); - - let decoder = ReaderBuilder::new(schema) - .with_batch_size(batch_size) - .build_decoder()?; - let input = file_compression_type.convert_stream(s.boxed())?.fuse(); - - let stream = deserialize_stream( - input, - DecoderDeserializer::new(JsonDecoder::new(decoder)), - ); - Ok(stream.map_err(Into::into).boxed()) + if format_array { + // For streaming, we need to collect all bytes first + let bytes = s + .map_err(DataFusionError::from) + .try_fold(Vec::new(), |mut acc, chunk| async move { + acc.extend_from_slice(&chunk); + Ok(acc) + }) + .await?; + let decompressed = file_compression_type + .convert_read(std::io::Cursor::new(bytes))?; + let batches = read_json_array_to_batches( + BufReader::new(decompressed), + schema, + batch_size, + )?; + Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed()) + } else { + let s = s.map_err(DataFusionError::from); + let decoder = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build_decoder()?; + let input = + file_compression_type.convert_stream(s.boxed())?.fuse(); + let stream = deserialize_stream( + input, + DecoderDeserializer::new(JsonDecoder::new(decoder)), + ); + Ok(stream.map_err(Into::into).boxed()) + } } } })) } } +/// Read JSON array format and convert to RecordBatches +fn read_json_array_to_batches( + mut reader: R, + schema: SchemaRef, + batch_size: usize, +) -> Result> { + use arrow::json::ReaderBuilder; + + let mut content = String::new(); + reader.read_to_string(&mut content)?; + + // Parse JSON array + let values: Vec = serde_json::from_str(&content) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + if values.is_empty() { + return Ok(vec![RecordBatch::new_empty(schema)]); + } + + // Convert to NDJSON string for arrow-json reader + let ndjson: String = values + .iter() + .map(|v| v.to_string()) + .collect::>() + .join("\n"); + + let cursor = std::io::Cursor::new(ndjson); + let reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(cursor)?; + + reader.collect::, _>>().map_err(Into::into) +} + pub async fn plan_to_json( task_ctx: Arc, plan: Arc, diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 08bb25bd715b9..eb8b1664744c0 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -469,6 +469,7 @@ message JsonOptions { CompressionTypeVariant compression = 1; // Compression type optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference optional uint32 compression_level = 3; // Optional compression level + bool format_array = 4; // Whether the JSON is in array format [{},...] (default false = line-delimited) } message TableParquetOptions { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 3c41b8cad9ed1..27acece6d3fd9 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1105,6 +1105,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions { compression: compression.into(), compression_level: proto_opts.compression_level, schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), + format_array: proto_opts.format_array, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index ef0eae1981d93..352199c6693f9 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4589,6 +4589,9 @@ impl serde::Serialize for JsonOptions { if self.compression_level.is_some() { len += 1; } + if self.format_array { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.JsonOptions", len)?; if self.compression != 0 { let v = CompressionTypeVariant::try_from(self.compression) @@ -4603,6 +4606,9 @@ impl serde::Serialize for JsonOptions { if let Some(v) = self.compression_level.as_ref() { struct_ser.serialize_field("compressionLevel", v)?; } + if self.format_array { + struct_ser.serialize_field("formatArray", &self.format_array)?; + } struct_ser.end() } } @@ -4618,6 +4624,8 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { "schemaInferMaxRec", "compression_level", "compressionLevel", + "format_array", + "formatArray", ]; #[allow(clippy::enum_variant_names)] @@ -4625,6 +4633,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { Compression, SchemaInferMaxRec, CompressionLevel, + FormatArray, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -4649,6 +4658,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { "compression" => Ok(GeneratedField::Compression), "schemaInferMaxRec" | "schema_infer_max_rec" => Ok(GeneratedField::SchemaInferMaxRec), "compressionLevel" | "compression_level" => Ok(GeneratedField::CompressionLevel), + "formatArray" | "format_array" => Ok(GeneratedField::FormatArray), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -4671,6 +4681,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { let mut compression__ = None; let mut schema_infer_max_rec__ = None; let mut compression_level__ = None; + let mut format_array__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Compression => { @@ -4695,12 +4706,19 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } + GeneratedField::FormatArray => { + if format_array__.is_some() { + return Err(serde::de::Error::duplicate_field("formatArray")); + } + format_array__ = Some(map_.next_value()?); + } } } Ok(JsonOptions { compression: compression__.unwrap_or_default(), schema_infer_max_rec: schema_infer_max_rec__, compression_level: compression_level__, + format_array: format_array__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 16601dcf46977..afdb6628225b2 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -665,6 +665,9 @@ pub struct JsonOptions { /// Optional compression level #[prost(uint32, optional, tag = "3")] pub compression_level: ::core::option::Option, + /// Whether the JSON is in array format \[{},...\] (default false = line-delimited) + #[prost(bool, tag = "4")] + pub format_array: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index fee3656482005..ef7563230f49d 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -990,6 +990,7 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions { compression: compression.into(), schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64), compression_level: opts.compression_level, + format_array: opts.format_array, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 16601dcf46977..afdb6628225b2 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -665,6 +665,9 @@ pub struct JsonOptions { /// Optional compression level #[prost(uint32, optional, tag = "3")] pub compression_level: ::core::option::Option, + /// Whether the JSON is in array format \[{},...\] (default false = line-delimited) + #[prost(bool, tag = "4")] + pub format_array: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 436a06493766d..e753e34462d86 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -241,6 +241,7 @@ impl JsonOptionsProto { compression: options.compression as i32, schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64), compression_level: options.compression_level, + format_array: options.format_array, } } else { JsonOptionsProto::default() @@ -260,6 +261,7 @@ impl From<&JsonOptionsProto> for JsonOptions { }, schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize), compression_level: proto.compression_level, + format_array: proto.format_array, } } } diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index b46b8c49d6623..4442a6a2d5af2 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -146,3 +146,31 @@ EXPLAIN SELECT id FROM json_partitioned_test WHERE part = 2 ---- logical_plan TableScan: json_partitioned_test projection=[id], full_filters=[json_partitioned_test.part = Int32(2)] physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_json/part=2/data.json]]}, projection=[id], file_type=json + +########## +## JSON Array Format Tests +########## + +# Test reading JSON array format file with format_array=true +statement ok +CREATE EXTERNAL TABLE json_array_test +STORED AS JSON +OPTIONS ('format.format_array' 'true') +LOCATION '../core/tests/data/json_array.json'; + +query IT rowsort +SELECT a, b FROM json_array_test +---- +1 hello +2 world +3 test + +statement ok +DROP TABLE json_array_test; + +# Test that reading JSON array format WITHOUT format_array option fails +# (default is line-delimited mode which can't parse array format correctly) +statement error Not valid JSON +CREATE EXTERNAL TABLE json_array_as_ndjson +STORED AS JSON +LOCATION '../core/tests/data/json_array.json'; \ No newline at end of file From 00692afab32944c9443534a4547d5ecdb11aa555 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 23 Jan 2026 22:06:21 +0800 Subject: [PATCH 2/3] Address comments and redesign --- datafusion/common/src/config.rs | 35 +- .../core/src/datasource/file_format/json.rs | 313 ++++++++++++++++-- .../src/datasource/file_format/options.rs | 48 ++- datafusion/datasource-json/src/file_format.rs | 154 ++++++--- datafusion/datasource-json/src/source.rs | 94 +++--- .../proto/datafusion_common.proto | 2 +- datafusion/proto-common/src/from_proto/mod.rs | 2 +- .../proto-common/src/generated/pbjson.rs | 26 +- .../proto-common/src/generated/prost.rs | 4 +- datafusion/proto-common/src/to_proto/mod.rs | 2 +- .../src/generated/datafusion_proto_common.rs | 4 +- .../proto/src/logical_plan/file_formats.rs | 4 +- datafusion/sqllogictest/test_files/json.slt | 10 +- 13 files changed, 541 insertions(+), 157 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 8f2a07a83385c..580aad561c67b 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -3065,22 +3065,25 @@ config_namespace! { /// If not specified, the default level for the compression algorithm is used. pub compression_level: Option, default = None pub schema_infer_max_rec: Option, default = None - /// The format of JSON input files. - /// - /// When `false` (default), expects newline-delimited JSON (NDJSON): - /// ```text - /// {"key1": 1, "key2": "val"} - /// {"key1": 2, "key2": "vals"} - /// ``` - /// - /// When `true`, expects JSON array format: - /// ```text - /// [ - /// {"key1": 1, "key2": "val"}, - /// {"key1": 2, "key2": "vals"} - /// ] - /// ``` - pub format_array: bool, default = false + /// The JSON format to use when reading files. + /// + /// When `true` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + /// + /// Note: JSON array format requires loading the entire file into memory. + /// For large files, newline-delimited format is recommended. + pub newline_delimited: bool, default = true } } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 35b3acb7a603d..56c2a2b28a80a 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -411,8 +411,8 @@ mod tests { ]"#, )?; - // Test with format_array = true - let format = JsonFormat::default().with_format_array(true); + // Test with newline_delimited = false (JSON array format) + let format = JsonFormat::default().with_newline_delimited(false); let file_schema = format .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) .await @@ -438,18 +438,14 @@ mod tests { let path = format!("{}/empty_array.json", tmp_dir.path().to_string_lossy()); std::fs::write(&path, "[]")?; - let format = JsonFormat::default().with_format_array(true); - let result = format + let format = JsonFormat::default().with_newline_delimited(false); + let file_schema = format .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) - .await; + .await + .expect("Schema inference for empty array"); - assert!(result.is_err()); - assert!( - result - .unwrap_err() - .to_string() - .contains("JSON array is empty") - ); + // Empty array should return empty schema + assert_eq!(file_schema.fields().len(), 0); Ok(()) } @@ -472,7 +468,7 @@ mod tests { // Only infer from first record let format = JsonFormat::default() - .with_format_array(true) + .with_newline_delimited(false) .with_schema_infer_max_rec(1); let file_schema = format @@ -510,7 +506,7 @@ mod tests { ]"#, )?; - let format = JsonFormat::default().with_format_array(true); + let format = JsonFormat::default().with_newline_delimited(false); // Infer schema let file_schema = format @@ -555,7 +551,7 @@ mod tests { let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); std::fs::write(&path, r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#)?; - let format = JsonFormat::default().with_format_array(true); + let format = JsonFormat::default().with_newline_delimited(false); let file_schema = format .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) .await?; @@ -581,7 +577,7 @@ mod tests { } #[tokio::test] - async fn test_ndjson_read_options_format_array() -> Result<()> { + async fn test_ndjson_read_options_newline_delimited() -> Result<()> { let ctx = SessionContext::new(); // Create a temporary file with JSON array format @@ -596,8 +592,8 @@ mod tests { ]"#, )?; - // Use NdJsonReadOptions with format_array = true - let options = NdJsonReadOptions::default().format_array(true); + // Use NdJsonReadOptions with newline_delimited = false (JSON array format) + let options = NdJsonReadOptions::default().newline_delimited(false); ctx.register_json("json_array_table", &path, options) .await?; @@ -622,7 +618,7 @@ mod tests { } #[tokio::test] - async fn test_ndjson_read_options_format_array_with_compression() -> Result<()> { + async fn test_ndjson_read_options_json_array_with_compression() -> Result<()> { use flate2::Compression; use flate2::write::GzEncoder; use std::io::Write; @@ -639,9 +635,9 @@ mod tests { encoder.write_all(json_content.as_bytes())?; encoder.finish()?; - // Use NdJsonReadOptions with format_array and GZIP compression + // Use NdJsonReadOptions with newline_delimited = false and GZIP compression let options = NdJsonReadOptions::default() - .format_array(true) + .newline_delimited(false) .file_compression_type(FileCompressionType::GZIP) .file_extension(".json.gz"); @@ -664,4 +660,279 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_json_array_format_with_nested_struct() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let task_ctx = ctx.task_ctx(); + let store = Arc::new(LocalFileSystem::new()) as _; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/nested.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"id": 1, "info": {"name": "Alice", "age": 30}}, + {"id": 2, "info": {"name": "Bob", "age": 25}}, + {"id": 3, "info": {"name": "Charlie", "age": 35}} + ]"#, + )?; + + let format = JsonFormat::default().with_newline_delimited(false); + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await?; + + // Verify nested struct in schema + let info_field = file_schema.field_with_name("info").unwrap(); + assert!(matches!(info_field.data_type(), DataType::Struct(_))); + + let exec = scan_format( + &ctx, + &format, + Some(file_schema), + tmp_dir.path().to_str().unwrap(), + "nested.json", + None, + None, + ) + .await?; + let batches = collect(exec, task_ctx).await?; + + assert_eq!(1, batches.len()); + assert_eq!(3, batches[0].num_rows()); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_with_list() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let task_ctx = ctx.task_ctx(); + let store = Arc::new(LocalFileSystem::new()) as _; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/list.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"id": 1, "tags": ["a", "b", "c"]}, + {"id": 2, "tags": ["d", "e"]}, + {"id": 3, "tags": ["f"]} + ]"#, + )?; + + let format = JsonFormat::default().with_newline_delimited(false); + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await?; + + // Verify list type in schema + let tags_field = file_schema.field_with_name("tags").unwrap(); + assert!(matches!(tags_field.data_type(), DataType::List(_))); + + let exec = scan_format( + &ctx, + &format, + Some(file_schema), + tmp_dir.path().to_str().unwrap(), + "list.json", + None, + None, + ) + .await?; + let batches = collect(exec, task_ctx).await?; + + assert_eq!(1, batches.len()); + assert_eq!(3, batches[0].num_rows()); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_with_list_of_structs() -> Result<()> { + let ctx = SessionContext::new(); + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/list_struct.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"id": 1, "items": [{"name": "item1", "price": 10.5}, {"name": "item2", "price": 20.0}]}, + {"id": 2, "items": [{"name": "item3", "price": 15.0}]}, + {"id": 3, "items": []} + ]"#, + )?; + + let options = NdJsonReadOptions::default().newline_delimited(false); + ctx.register_json("list_struct_table", &path, options) + .await?; + + // Query nested struct fields + let result = ctx + .sql("SELECT id, items FROM list_struct_table ORDER BY id") + .await? + .collect() + .await?; + + assert_eq!(1, result.len()); + assert_eq!(3, result[0].num_rows()); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_with_unnest() -> Result<()> { + let ctx = SessionContext::new(); + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/unnest.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"id": 1, "values": [10, 20, 30]}, + {"id": 2, "values": [40, 50]}, + {"id": 3, "values": [60]} + ]"#, + )?; + + let options = NdJsonReadOptions::default().newline_delimited(false); + ctx.register_json("unnest_table", &path, options).await?; + + // Test UNNEST on array column + let result = ctx + .sql( + "SELECT id, unnest(values) as value FROM unnest_table ORDER BY id, value", + ) + .await? + .collect() + .await?; + + assert_snapshot!(batches_to_string(&result), @r" + +----+-------+ + | id | value | + +----+-------+ + | 1 | 10 | + | 1 | 20 | + | 1 | 30 | + | 2 | 40 | + | 2 | 50 | + | 3 | 60 | + +----+-------+ + "); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_with_unnest_struct() -> Result<()> { + let ctx = SessionContext::new(); + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/unnest_struct.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[{"id": 1, "orders": [{"product": "A", "qty": 2}, {"product": "B", "qty": 3}]}, {"id": 2, "orders": [{"product": "C", "qty": 1}]}]"#, + )?; + + let options = NdJsonReadOptions::default().newline_delimited(false); + ctx.register_json("unnest_struct_table", &path, options) + .await?; + + // Test UNNEST on List column and access struct fields + let result = ctx + .sql( + "SELECT id, unnest(orders)['product'] as product, unnest(orders)['qty'] as qty + FROM unnest_struct_table + ORDER BY id, product" + ) + .await? + .collect() + .await?; + + assert_snapshot!(batches_to_string(&result), @r" + +----+---------+-----+ + | id | product | qty | + +----+---------+-----+ + | 1 | A | 2 | + | 1 | B | 3 | + | 2 | C | 1 | + +----+---------+-----+ + "); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_deeply_nested() -> Result<()> { + let ctx = SessionContext::new(); + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/deep_nested.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[{"id": 1, "department": {"name": "Engineering", "head": "Alice"}}, {"id": 2, "department": {"name": "Sales", "head": "Bob"}}]"#, + )?; + + let options = NdJsonReadOptions::default().newline_delimited(false); + ctx.register_json("deep_nested_table", &path, options) + .await?; + + // Query nested struct data + let result = ctx + .sql("SELECT id, department['name'] as dept_name, department['head'] as dept_head FROM deep_nested_table ORDER BY id") + .await? + .collect() + .await?; + + assert_snapshot!(batches_to_string(&result), @r" + +----+-------------+-----------+ + | id | dept_name | dept_head | + +----+-------------+-----------+ + | 1 | Engineering | Alice | + | 2 | Sales | Bob | + +----+-------------+-----------+ + "); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_with_null_values() -> Result<()> { + let ctx = SessionContext::new(); + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/nulls.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"id": 1, "name": "Alice", "score": 100}, + {"id": 2, "name": null, "score": 85}, + {"id": 3, "name": "Charlie", "score": null} + ]"#, + )?; + + let options = NdJsonReadOptions::default().newline_delimited(false); + ctx.register_json("null_table", &path, options).await?; + + let result = ctx + .sql("SELECT id, name, score FROM null_table ORDER BY id") + .await? + .collect() + .await?; + + assert_snapshot!(batches_to_string(&result), @r" + +----+---------+-------+ + | id | name | score | + +----+---------+-------+ + | 1 | Alice | 100 | + | 2 | | 85 | + | 3 | Charlie | | + +----+---------+-------+ + "); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 8fa9ef28d78bc..1798bca86b035 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -442,7 +442,9 @@ impl<'a> AvroReadOptions<'a> { } } -/// Options that control the reading of Line-delimited JSON files (NDJson) +/// Options that control the reading of JSON files. +/// +/// Supports both newline-delimited JSON (NDJSON) and JSON array formats. /// /// Note this structure is supplied when a datasource is created and /// can not not vary from statement to statement. For settings that @@ -465,9 +467,22 @@ pub struct NdJsonReadOptions<'a> { pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, - /// Whether the JSON file is in array format `[{...}, {...}]` instead of - /// line-delimited format. Defaults to `false`. - pub format_array: bool, + /// Whether to read as newline-delimited JSON (default: true). + /// + /// When `true` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub newline_delimited: bool, } impl Default for NdJsonReadOptions<'_> { @@ -480,7 +495,7 @@ impl Default for NdJsonReadOptions<'_> { file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, file_sort_order: vec![], - format_array: false, + newline_delimited: true, } } } @@ -534,10 +549,23 @@ impl<'a> NdJsonReadOptions<'a> { self } - /// Specify whether the JSON file is in array format `[{...}, {...}]` - /// instead of line-delimited format. - pub fn format_array(mut self, format_array: bool) -> Self { - self.format_array = format_array; + /// Set whether to read as newline-delimited JSON. + /// + /// When `true` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub fn newline_delimited(mut self, newline_delimited: bool) -> Self { + self.newline_delimited = newline_delimited; self } } @@ -675,7 +703,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { .with_options(table_options.json) .with_schema_infer_max_rec(self.schema_infer_max_records) .with_file_compression_type(self.file_compression_type.to_owned()) - .with_format_array(self.format_array); + .with_newline_delimited(self.newline_delimited); ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) diff --git a/datafusion/datasource-json/src/file_format.rs b/datafusion/datasource-json/src/file_format.rs index f6b258eb7c078..46e67f5768286 100644 --- a/datafusion/datasource-json/src/file_format.rs +++ b/datafusion/datasource-json/src/file_format.rs @@ -30,11 +30,14 @@ use arrow::array::RecordBatch; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::ArrowError; use arrow::json; -use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator}; +use arrow::json::reader::{ + ValueIter, infer_json_schema, infer_json_schema_from_iterator, +}; +use bytes::{Buf, Bytes}; use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions}; use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::{ - DEFAULT_JSON_EXTENSION, GetExt, Result, Statistics, not_impl_err, + DEFAULT_JSON_EXTENSION, DataFusionError, GetExt, Result, Statistics, not_impl_err, }; use datafusion_common_runtime::SpawnedTask; use datafusion_datasource::TableSchema; @@ -59,7 +62,6 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; use async_trait::async_trait; -use bytes::{Buf, Bytes}; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; #[derive(Default)] @@ -136,19 +138,22 @@ impl Debug for JsonFormatFactory { /// /// # Supported Formats /// -/// ## Line-Delimited JSON (default) +/// ## Line-Delimited JSON (default, `newline_delimited = true`) /// ```text /// {"key1": 1, "key2": "val"} /// {"key1": 2, "key2": "vals"} /// ``` /// -/// ## JSON Array Format (when `format_array` option is true) +/// ## JSON Array Format (`newline_delimited = false`) /// ```text /// [ /// {"key1": 1, "key2": "val"}, /// {"key1": 2, "key2": "vals"} /// ] /// ``` +/// +/// Note: JSON array format requires loading the entire file into memory, +/// which may not be suitable for very large files. #[derive(Debug, Default)] pub struct JsonFormat { options: JsonOptions, @@ -183,48 +188,106 @@ impl JsonFormat { self } - /// Set whether to expect JSON array format instead of line-delimited format. + /// Set whether to read as newline-delimited JSON (NDJSON). /// - /// When `true`, expects input like: `[{"a": 1}, {"a": 2}]` - /// When `false` (default), expects input like: + /// When `true` (default), expects newline-delimited format: /// ```text /// {"a": 1} /// {"a": 2} /// ``` - pub fn with_format_array(mut self, format_array: bool) -> Self { - self.options.format_array = format_array; + /// + /// When `false`, expects JSON array format: + /// ```text + /// [{"a": 1}, {"a": 2}] + /// ``` + pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self { + self.options.newline_delimited = newline_delimited; self } + + /// Returns whether this format expects newline-delimited JSON. + pub fn is_newline_delimited(&self) -> bool { + self.options.newline_delimited + } } -/// Infer schema from a JSON array format file. +/// Extract JSON records from array format using bracket tracking. /// -/// This function reads JSON data in array format `[{...}, {...}]` and infers -/// the Arrow schema from the contained objects. -fn infer_json_schema_from_json_array( - reader: &mut R, - max_records: usize, -) -> std::result::Result { - let mut content = String::new(); - reader.read_to_string(&mut content).map_err(|e| { - ArrowError::JsonError(format!("Failed to read JSON content: {e}")) - })?; - - // Parse as JSON array using serde_json - let values: Vec = serde_json::from_str(&content) - .map_err(|e| ArrowError::JsonError(format!("Failed to parse JSON array: {e}")))?; - - // Take only max_records for schema inference - let values_to_infer: Vec<_> = values.into_iter().take(max_records).collect(); - - if values_to_infer.is_empty() { - return Err(ArrowError::JsonError( - "JSON array is empty, cannot infer schema".to_string(), +/// This avoids full JSON parsing by only tracking brace depth to find +/// record boundaries. Much faster than serde_json::from_str() for large files. +fn extract_json_records(content: &str) -> Result> { + let content = content.trim(); + if !content.starts_with('[') || !content.ends_with(']') { + return Err(DataFusionError::Execution( + "JSON array format must start with '[' and end with ']'".to_string(), )); } - // Use arrow's schema inference on the parsed values - infer_json_schema_from_iterator(values_to_infer.into_iter().map(Ok)) + // Remove outer brackets + let inner = &content[1..content.len() - 1]; + let mut records = Vec::new(); + let mut depth = 0; + let mut in_string = false; + let mut escape_next = false; + let mut record_start: Option = None; + + for (i, ch) in inner.char_indices() { + if escape_next { + escape_next = false; + continue; + } + + match ch { + '\\' if in_string => escape_next = true, + '"' => in_string = !in_string, + '{' if !in_string => { + if depth == 0 { + record_start = Some(i); + } + depth += 1; + } + '}' if !in_string => { + depth -= 1; + if depth == 0 + && let Some(start) = record_start + { + records.push(inner[start..=i].to_string()); + record_start = None; + } + } + _ => {} + } + } + + Ok(records) +} + +/// Infer schema from JSON array format content (synchronous version). +/// +/// This function extracts individual JSON records from array format +/// and uses arrow-json's schema inference on the extracted records. +fn infer_schema_from_json_array_content( + content: &str, + max_records: usize, +) -> Result { + let records = extract_json_records(content)?; + + let records_to_infer: Vec<&str> = records + .iter() + .take(max_records) + .map(|s| s.as_str()) + .collect(); + + if records_to_infer.is_empty() { + return Ok(Schema::empty()); + } + + // Create NDJSON string for arrow-json schema inference + let ndjson = records_to_infer.join("\n"); + let cursor = std::io::Cursor::new(ndjson.as_bytes()); + + let (schema, _) = infer_json_schema(cursor, Some(max_records))?; + Ok(schema) } #[async_trait] @@ -261,7 +324,7 @@ impl FileFormat for JsonFormat { .schema_infer_max_rec .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD); let file_compression_type = FileCompressionType::from(self.options.compression); - let is_array_format = self.options.format_array; + let newline_delimited = self.options.newline_delimited; for object in objects { let mut take_while = || { @@ -279,13 +342,16 @@ impl FileFormat for JsonFormat { let decoder = file_compression_type.convert_read(file)?; let mut reader = BufReader::new(decoder); - if is_array_format { - infer_json_schema_from_json_array(&mut reader, records_to_read)? - } else { + if newline_delimited { let iter = ValueIter::new(&mut reader, None); infer_json_schema_from_iterator( iter.take_while(|_| take_while()), )? + } else { + // JSON array format: read content and extract records + let mut content = String::new(); + reader.read_to_string(&mut content)?; + infer_schema_from_json_array_content(&content, records_to_read)? } } GetResultPayload::Stream(_) => { @@ -293,13 +359,16 @@ impl FileFormat for JsonFormat { let decoder = file_compression_type.convert_read(data.reader())?; let mut reader = BufReader::new(decoder); - if is_array_format { - infer_json_schema_from_json_array(&mut reader, records_to_read)? - } else { + if newline_delimited { let iter = ValueIter::new(&mut reader, None); infer_json_schema_from_iterator( iter.take_while(|_| take_while()), )? + } else { + // JSON array format: read content and extract records + let mut content = String::new(); + reader.read_to_string(&mut content)?; + infer_schema_from_json_array_content(&content, records_to_read)? } } }; @@ -357,7 +426,8 @@ impl FileFormat for JsonFormat { fn file_source(&self, table_schema: TableSchema) -> Arc { Arc::new( - JsonSource::new(table_schema).with_format_array(self.options.format_array), + JsonSource::new(table_schema) + .with_newline_delimited(self.options.newline_delimited), ) } } diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 429e4a189aed4..40113b0af9921 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Execution plan for reading line-delimited JSON files +//! Execution plan for reading JSON files (line-delimited and array formats) use std::any::Any; use std::io::{BufReader, Read, Seek, SeekFrom}; @@ -55,24 +55,26 @@ pub struct JsonOpener { projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, - format_array: bool, + /// When `true` (default), expects newline-delimited JSON (NDJSON). + /// When `false`, expects JSON array format `[{...}, {...}]`. + newline_delimited: bool, } impl JsonOpener { - /// Returns a [`JsonOpener`] + /// Returns a [`JsonOpener`] pub fn new( batch_size: usize, projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, - format_array: bool, + newline_delimited: bool, ) -> Self { Self { batch_size, projected_schema, file_compression_type, object_store, - format_array, + newline_delimited, } } } @@ -84,7 +86,9 @@ pub struct JsonSource { batch_size: Option, metrics: ExecutionPlanMetricsSet, projection: SplitProjection, - format_array: bool, + /// When `true` (default), expects newline-delimited JSON (NDJSON). + /// When `false`, expects JSON array format `[{...}, {...}]`. + newline_delimited: bool, } impl JsonSource { @@ -96,13 +100,16 @@ impl JsonSource { table_schema, batch_size: None, metrics: ExecutionPlanMetricsSet::new(), - format_array: false, + newline_delimited: true, } } - /// Set whether to expect JSON array format - pub fn with_format_array(mut self, format_array: bool) -> Self { - self.format_array = format_array; + /// Set whether to read as newline-delimited JSON. + /// + /// When `true` (default), expects newline-delimited format. + /// When `false`, expects JSON array format `[{...}, {...}]`. + pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self { + self.newline_delimited = newline_delimited; self } } @@ -132,7 +139,7 @@ impl FileSource for JsonSource { projected_schema, file_compression_type: base_config.file_compression_type, object_store, - format_array: self.format_array, + newline_delimited: self.newline_delimited, }) as Arc; // Wrap with ProjectionOpener @@ -185,7 +192,7 @@ impl FileSource for JsonSource { } impl FileOpener for JsonOpener { - /// Open a partitioned NDJSON file. + /// Open a partitioned JSON file. /// /// If `file_meta.range` is `None`, the entire file is opened. /// Else `file_meta.range` is `Some(FileRange{start, end})`, which corresponds to the byte range [start, end) within the file. @@ -194,18 +201,20 @@ impl FileOpener for JsonOpener { /// are applied to determine which lines to read: /// 1. The first line of the partition is the line in which the index of the first character >= `start`. /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides. + /// + /// Note: JSON array format does not support range-based scanning. fn open(&self, partitioned_file: PartitionedFile) -> Result { let store = Arc::clone(&self.object_store); let schema = Arc::clone(&self.projected_schema); let batch_size = self.batch_size; let file_compression_type = self.file_compression_type.to_owned(); - let format_array = self.format_array; + let newline_delimited = self.newline_delimited; // JSON array format requires reading the complete file - if format_array && partitioned_file.range.is_some() { + if !newline_delimited && partitioned_file.range.is_some() { return Err(DataFusionError::NotImplemented( "JSON array format does not support range-based file scanning. \ - Disable repartition_file_scans or use line-delimited JSON format." + Disable repartition_file_scans or use newline-delimited JSON format." .to_string(), )); } @@ -245,26 +254,40 @@ impl FileOpener for JsonOpener { } }; - if format_array { - // Handle JSON array format - let batches = read_json_array_to_batches( - BufReader::new(bytes), - schema, - batch_size, - )?; - Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed()) - } else { + if newline_delimited { + // Newline-delimited JSON (NDJSON) reader let reader = ReaderBuilder::new(schema) .with_batch_size(batch_size) .build(BufReader::new(bytes))?; Ok(futures::stream::iter(reader) .map(|r| r.map_err(Into::into)) .boxed()) + } else { + // JSON array format reader + let batches = read_json_array_to_batches( + BufReader::new(bytes), + schema, + batch_size, + )?; + Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed()) } } GetResultPayload::Stream(s) => { - if format_array { - // For streaming, we need to collect all bytes first + if newline_delimited { + // Newline-delimited JSON (NDJSON) streaming reader + let s = s.map_err(DataFusionError::from); + let decoder = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build_decoder()?; + let input = + file_compression_type.convert_stream(s.boxed())?.fuse(); + let stream = deserialize_stream( + input, + DecoderDeserializer::new(JsonDecoder::new(decoder)), + ); + Ok(stream.map_err(Into::into).boxed()) + } else { + // JSON array format: collect all bytes first let bytes = s .map_err(DataFusionError::from) .try_fold(Vec::new(), |mut acc, chunk| async move { @@ -280,18 +303,6 @@ impl FileOpener for JsonOpener { batch_size, )?; Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed()) - } else { - let s = s.map_err(DataFusionError::from); - let decoder = ReaderBuilder::new(schema) - .with_batch_size(batch_size) - .build_decoder()?; - let input = - file_compression_type.convert_stream(s.boxed())?.fuse(); - let stream = deserialize_stream( - input, - DecoderDeserializer::new(JsonDecoder::new(decoder)), - ); - Ok(stream.map_err(Into::into).boxed()) } } } @@ -299,14 +310,15 @@ impl FileOpener for JsonOpener { } } -/// Read JSON array format and convert to RecordBatches +/// Read JSON array format and convert to RecordBatches. +/// +/// Parses a JSON array `[{...}, {...}, ...]` and converts each object +/// to Arrow RecordBatches using the provided schema. fn read_json_array_to_batches( mut reader: R, schema: SchemaRef, batch_size: usize, ) -> Result> { - use arrow::json::ReaderBuilder; - let mut content = String::new(); reader.read_to_string(&mut content)?; diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index eb8b1664744c0..fa6cb17152576 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -469,7 +469,7 @@ message JsonOptions { CompressionTypeVariant compression = 1; // Compression type optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference optional uint32 compression_level = 3; // Optional compression level - bool format_array = 4; // Whether the JSON is in array format [{},...] (default false = line-delimited) + bool newline_delimited = 4; // Whether to read as newline-delimited JSON (default true). When false, expects JSON array format [{},...] } message TableParquetOptions { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 27acece6d3fd9..b98c48b30ba95 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1105,7 +1105,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions { compression: compression.into(), compression_level: proto_opts.compression_level, schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), - format_array: proto_opts.format_array, + newline_delimited: proto_opts.newline_delimited, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 352199c6693f9..cf36bb335f4ca 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4589,7 +4589,7 @@ impl serde::Serialize for JsonOptions { if self.compression_level.is_some() { len += 1; } - if self.format_array { + if self.newline_delimited { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion_common.JsonOptions", len)?; @@ -4606,8 +4606,8 @@ impl serde::Serialize for JsonOptions { if let Some(v) = self.compression_level.as_ref() { struct_ser.serialize_field("compressionLevel", v)?; } - if self.format_array { - struct_ser.serialize_field("formatArray", &self.format_array)?; + if self.newline_delimited { + struct_ser.serialize_field("newlineDelimited", &self.newline_delimited)?; } struct_ser.end() } @@ -4624,8 +4624,8 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { "schemaInferMaxRec", "compression_level", "compressionLevel", - "format_array", - "formatArray", + "newline_delimited", + "newlineDelimited", ]; #[allow(clippy::enum_variant_names)] @@ -4633,7 +4633,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { Compression, SchemaInferMaxRec, CompressionLevel, - FormatArray, + NewlineDelimited, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -4658,7 +4658,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { "compression" => Ok(GeneratedField::Compression), "schemaInferMaxRec" | "schema_infer_max_rec" => Ok(GeneratedField::SchemaInferMaxRec), "compressionLevel" | "compression_level" => Ok(GeneratedField::CompressionLevel), - "formatArray" | "format_array" => Ok(GeneratedField::FormatArray), + "newlineDelimited" | "newline_delimited" => Ok(GeneratedField::NewlineDelimited), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -4681,7 +4681,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { let mut compression__ = None; let mut schema_infer_max_rec__ = None; let mut compression_level__ = None; - let mut format_array__ = None; + let mut newline_delimited__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Compression => { @@ -4706,11 +4706,11 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } - GeneratedField::FormatArray => { - if format_array__.is_some() { - return Err(serde::de::Error::duplicate_field("formatArray")); + GeneratedField::NewlineDelimited => { + if newline_delimited__.is_some() { + return Err(serde::de::Error::duplicate_field("newlineDelimited")); } - format_array__ = Some(map_.next_value()?); + newline_delimited__ = Some(map_.next_value()?); } } } @@ -4718,7 +4718,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { compression: compression__.unwrap_or_default(), schema_infer_max_rec: schema_infer_max_rec__, compression_level: compression_level__, - format_array: format_array__.unwrap_or_default(), + newline_delimited: newline_delimited__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index afdb6628225b2..a1d4f5ce25425 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -665,9 +665,9 @@ pub struct JsonOptions { /// Optional compression level #[prost(uint32, optional, tag = "3")] pub compression_level: ::core::option::Option, - /// Whether the JSON is in array format \[{},...\] (default false = line-delimited) + /// Whether to read as newline-delimited JSON (default true). When false, expects JSON array format \[{},...\] #[prost(bool, tag = "4")] - pub format_array: bool, + pub newline_delimited: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index ef7563230f49d..ddb6da9848166 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -990,7 +990,7 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions { compression: compression.into(), schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64), compression_level: opts.compression_level, - format_array: opts.format_array, + newline_delimited: opts.newline_delimited, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index afdb6628225b2..a1d4f5ce25425 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -665,9 +665,9 @@ pub struct JsonOptions { /// Optional compression level #[prost(uint32, optional, tag = "3")] pub compression_level: ::core::option::Option, - /// Whether the JSON is in array format \[{},...\] (default false = line-delimited) + /// Whether to read as newline-delimited JSON (default true). When false, expects JSON array format \[{},...\] #[prost(bool, tag = "4")] - pub format_array: bool, + pub newline_delimited: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index e753e34462d86..adad64162f3c1 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -241,7 +241,7 @@ impl JsonOptionsProto { compression: options.compression as i32, schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64), compression_level: options.compression_level, - format_array: options.format_array, + newline_delimited: options.newline_delimited, } } else { JsonOptionsProto::default() @@ -261,7 +261,7 @@ impl From<&JsonOptionsProto> for JsonOptions { }, schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize), compression_level: proto.compression_level, - format_array: proto.format_array, + newline_delimited: proto.newline_delimited, } } } diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index 4442a6a2d5af2..60bec4213db02 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -151,11 +151,11 @@ physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/ ## JSON Array Format Tests ########## -# Test reading JSON array format file with format_array=true +# Test reading JSON array format file with newline_delimited=false statement ok CREATE EXTERNAL TABLE json_array_test STORED AS JSON -OPTIONS ('format.format_array' 'true') +OPTIONS ('format.newline_delimited' 'false') LOCATION '../core/tests/data/json_array.json'; query IT rowsort @@ -168,9 +168,9 @@ SELECT a, b FROM json_array_test statement ok DROP TABLE json_array_test; -# Test that reading JSON array format WITHOUT format_array option fails -# (default is line-delimited mode which can't parse array format correctly) +# Test that reading JSON array format WITHOUT newline_delimited option fails +# (default is newline_delimited=true which can't parse array format correctly) statement error Not valid JSON CREATE EXTERNAL TABLE json_array_as_ndjson STORED AS JSON -LOCATION '../core/tests/data/json_array.json'; \ No newline at end of file +LOCATION '../core/tests/data/json_array.json'; From 796d2c2de4973fc1cb3a432d7cabde592effed68 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Fri, 23 Jan 2026 22:14:17 +0800 Subject: [PATCH 3/3] better testing --- .../core/src/datasource/file_format/json.rs | 632 +++++------------- 1 file changed, 164 insertions(+), 468 deletions(-) diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index 56c2a2b28a80a..648f100796a62 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -54,6 +54,47 @@ mod tests { use regex::Regex; use rstest::rstest; + // ==================== Test Helpers ==================== + + /// Create a temporary JSON file and return (TempDir, path) + fn create_temp_json(content: &str) -> (tempfile::TempDir, String) { + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}/test.json", tmp_dir.path().to_string_lossy()); + std::fs::write(&path, content).unwrap(); + (tmp_dir, path) + } + + /// Infer schema from JSON array format file + async fn infer_json_array_schema( + content: &str, + ) -> Result { + let (_tmp_dir, path) = create_temp_json(content); + let session = SessionContext::new(); + let ctx = session.state(); + let store = Arc::new(LocalFileSystem::new()) as _; + let format = JsonFormat::default().with_newline_delimited(false); + format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await + } + + /// Register a JSON array table and run a query + async fn query_json_array(content: &str, query: &str) -> Result> { + let (_tmp_dir, path) = create_temp_json(content); + let ctx = SessionContext::new(); + let options = NdJsonReadOptions::default().newline_delimited(false); + ctx.register_json("test_table", &path, options).await?; + ctx.sql(query).await?.collect().await + } + + /// Register a JSON array table and run a query, return formatted string + async fn query_json_array_str(content: &str, query: &str) -> Result { + let result = query_json_array(content, query).await?; + Ok(batches_to_string(&result)) + } + + // ==================== Existing Tests ==================== + #[tokio::test] async fn read_small_batches() -> Result<()> { let config = SessionConfig::new().with_batch_size(2); @@ -315,7 +356,6 @@ mod tests { .digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 }"#.into()); let mut all_batches = RecordBatch::new_empty(schema.clone()); - // We get RequiresMoreData after 2 batches because of how json::Decoder works for _ in 0..2 { let output = deserializer.next()?; let DeserializerOutput::RecordBatch(batch) = output else { @@ -359,7 +399,6 @@ mod tests { let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?; df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) .await?; - // Expected the file to exist and be empty assert!(std::path::Path::new(&path).exists()); let metadata = std::fs::metadata(&path)?; assert_eq!(metadata.len(), 0); @@ -386,553 +425,210 @@ mod tests { let df = ctx.read_batch(empty_batch.clone())?; df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) .await?; - // Expected the file to exist and be empty assert!(std::path::Path::new(&path).exists()); let metadata = std::fs::metadata(&path)?; assert_eq!(metadata.len(), 0); Ok(()) } - #[tokio::test] - async fn test_json_array_format() -> Result<()> { - let session = SessionContext::new(); - let ctx = session.state(); - let store = Arc::new(LocalFileSystem::new()) as _; + // ==================== JSON Array Format Tests ==================== - // Create a temporary file with JSON array format - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[ - {"a": 1, "b": 2.0, "c": true}, - {"a": 2, "b": 3.5, "c": false}, - {"a": 3, "b": 4.0, "c": true} - ]"#, - )?; - - // Test with newline_delimited = false (JSON array format) - let format = JsonFormat::default().with_newline_delimited(false); - let file_schema = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) - .await - .expect("Schema inference"); + #[tokio::test] + async fn test_json_array_schema_inference() -> Result<()> { + let schema = infer_json_array_schema( + r#"[{"a": 1, "b": 2.0, "c": true}, {"a": 2, "b": 3.5, "c": false}]"#, + ) + .await?; - let fields = file_schema + let fields: Vec<_> = schema .fields() .iter() .map(|f| format!("{}: {:?}", f.name(), f.data_type())) - .collect::>(); + .collect(); assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields); - Ok(()) } #[tokio::test] - async fn test_json_array_format_empty() -> Result<()> { - let session = SessionContext::new(); - let ctx = session.state(); - let store = Arc::new(LocalFileSystem::new()) as _; - - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/empty_array.json", tmp_dir.path().to_string_lossy()); - std::fs::write(&path, "[]")?; - - let format = JsonFormat::default().with_newline_delimited(false); - let file_schema = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) - .await - .expect("Schema inference for empty array"); - - // Empty array should return empty schema - assert_eq!(file_schema.fields().len(), 0); - + async fn test_json_array_empty() -> Result<()> { + let schema = infer_json_array_schema("[]").await?; + assert_eq!(schema.fields().len(), 0); Ok(()) } #[tokio::test] - async fn test_json_array_format_with_limit() -> Result<()> { - let session = SessionContext::new(); - let ctx = session.state(); - let store = Arc::new(LocalFileSystem::new()) as _; - - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/array_limit.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[ - {"a": 1}, - {"a": 2, "b": "extra"} - ]"#, - )?; - - // Only infer from first record - let format = JsonFormat::default() - .with_newline_delimited(false) - .with_schema_infer_max_rec(1); - - let file_schema = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) - .await - .expect("Schema inference"); - - // Should only have field "a" since we limited to 1 record - let fields = file_schema - .fields() - .iter() - .map(|f| format!("{}: {:?}", f.name(), f.data_type())) - .collect::>(); - assert_eq!(vec!["a: Int64"], fields); - - Ok(()) - } - - #[tokio::test] - async fn test_json_array_format_read_data() -> Result<()> { - let session = SessionContext::new(); - let ctx = session.state(); - let task_ctx = ctx.task_ctx(); - let store = Arc::new(LocalFileSystem::new()) as _; - - // Create a temporary file with JSON array format - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[ - {"a": 1, "b": 2.0, "c": true}, - {"a": 2, "b": 3.5, "c": false}, - {"a": 3, "b": 4.0, "c": true} - ]"#, - )?; - - let format = JsonFormat::default().with_newline_delimited(false); - - // Infer schema - let file_schema = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) - .await?; - - // Scan and read data - let exec = scan_format( - &ctx, - &format, - Some(file_schema), - tmp_dir.path().to_str().unwrap(), - "array.json", - None, - None, + async fn test_json_array_nested_struct() -> Result<()> { + let schema = infer_json_array_schema( + r#"[{"id": 1, "info": {"name": "Alice", "age": 30}}]"#, ) .await?; - let batches = collect(exec, task_ctx).await?; - - assert_eq!(1, batches.len()); - assert_eq!(3, batches[0].num_columns()); - assert_eq!(3, batches[0].num_rows()); - - // Verify data - let array_a = as_int64_array(batches[0].column(0))?; - assert_eq!( - vec![1, 2, 3], - (0..3).map(|i| array_a.value(i)).collect::>() - ); + let info_field = schema.field_with_name("info").unwrap(); + assert!(matches!(info_field.data_type(), DataType::Struct(_))); Ok(()) } #[tokio::test] - async fn test_json_array_format_with_projection() -> Result<()> { - let session = SessionContext::new(); - let ctx = session.state(); - let task_ctx = ctx.task_ctx(); - let store = Arc::new(LocalFileSystem::new()) as _; - - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); - std::fs::write(&path, r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#)?; - - let format = JsonFormat::default().with_newline_delimited(false); - let file_schema = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) - .await?; - - // Project only column "a" - let exec = scan_format( - &ctx, - &format, - Some(file_schema), - tmp_dir.path().to_str().unwrap(), - "array.json", - Some(vec![0]), - None, - ) - .await?; - let batches = collect(exec, task_ctx).await?; - - assert_eq!(1, batches.len()); - assert_eq!(1, batches[0].num_columns()); // Only 1 column projected - assert_eq!(2, batches[0].num_rows()); + async fn test_json_array_list_type() -> Result<()> { + let schema = + infer_json_array_schema(r#"[{"id": 1, "tags": ["a", "b", "c"]}]"#).await?; + let tags_field = schema.field_with_name("tags").unwrap(); + assert!(matches!(tags_field.data_type(), DataType::List(_))); Ok(()) } #[tokio::test] - async fn test_ndjson_read_options_newline_delimited() -> Result<()> { - let ctx = SessionContext::new(); - - // Create a temporary file with JSON array format - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[ - {"a": 1, "b": "hello"}, - {"a": 2, "b": "world"}, - {"a": 3, "b": "test"} - ]"#, - )?; - - // Use NdJsonReadOptions with newline_delimited = false (JSON array format) - let options = NdJsonReadOptions::default().newline_delimited(false); - - ctx.register_json("json_array_table", &path, options) - .await?; - - let result = ctx - .sql("SELECT a, b FROM json_array_table ORDER BY a") - .await? - .collect() - .await?; - - assert_snapshot!(batches_to_string(&result), @r" - +---+-------+ - | a | b | - +---+-------+ - | 1 | hello | - | 2 | world | - | 3 | test | - +---+-------+ - "); + async fn test_json_array_basic_query() -> Result<()> { + let result = query_json_array_str( + r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}, {"a": 3, "b": "test"}]"#, + "SELECT a, b FROM test_table ORDER BY a", + ) + .await?; + assert_snapshot!(result, @r" + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + | 3 | test | + +---+-------+ + "); Ok(()) } #[tokio::test] - async fn test_ndjson_read_options_json_array_with_compression() -> Result<()> { - use flate2::Compression; - use flate2::write::GzEncoder; - use std::io::Write; - - let ctx = SessionContext::new(); - - // Create a temporary gzip compressed JSON array file - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy()); - - let json_content = r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#; - let file = std::fs::File::create(&path)?; - let mut encoder = GzEncoder::new(file, Compression::default()); - encoder.write_all(json_content.as_bytes())?; - encoder.finish()?; - - // Use NdJsonReadOptions with newline_delimited = false and GZIP compression - let options = NdJsonReadOptions::default() - .newline_delimited(false) - .file_compression_type(FileCompressionType::GZIP) - .file_extension(".json.gz"); - - ctx.register_json("json_array_gzip", &path, options).await?; - - let result = ctx - .sql("SELECT a, b FROM json_array_gzip ORDER BY a") - .await? - .collect() + async fn test_json_array_with_nulls() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "name": "Alice"}, {"id": 2, "name": null}, {"id": 3, "name": "Charlie"}]"#, + "SELECT id, name FROM test_table ORDER BY id", + ) .await?; - assert_snapshot!(batches_to_string(&result), @r" - +---+-------+ - | a | b | - +---+-------+ - | 1 | hello | - | 2 | world | - +---+-------+ - "); - + assert_snapshot!(result, @r" + +----+---------+ + | id | name | + +----+---------+ + | 1 | Alice | + | 2 | | + | 3 | Charlie | + +----+---------+ + "); Ok(()) } #[tokio::test] - async fn test_json_array_format_with_nested_struct() -> Result<()> { - let session = SessionContext::new(); - let ctx = session.state(); - let task_ctx = ctx.task_ctx(); - let store = Arc::new(LocalFileSystem::new()) as _; - - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/nested.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[ - {"id": 1, "info": {"name": "Alice", "age": 30}}, - {"id": 2, "info": {"name": "Bob", "age": 25}}, - {"id": 3, "info": {"name": "Charlie", "age": 35}} - ]"#, - )?; - - let format = JsonFormat::default().with_newline_delimited(false); - let file_schema = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) - .await?; - - // Verify nested struct in schema - let info_field = file_schema.field_with_name("info").unwrap(); - assert!(matches!(info_field.data_type(), DataType::Struct(_))); - - let exec = scan_format( - &ctx, - &format, - Some(file_schema), - tmp_dir.path().to_str().unwrap(), - "nested.json", - None, - None, + async fn test_json_array_unnest() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "values": [10, 20, 30]}, {"id": 2, "values": [40, 50]}]"#, + "SELECT id, unnest(values) as value FROM test_table ORDER BY id, value", ) .await?; - let batches = collect(exec, task_ctx).await?; - - assert_eq!(1, batches.len()); - assert_eq!(3, batches[0].num_rows()); + assert_snapshot!(result, @r" + +----+-------+ + | id | value | + +----+-------+ + | 1 | 10 | + | 1 | 20 | + | 1 | 30 | + | 2 | 40 | + | 2 | 50 | + +----+-------+ + "); Ok(()) } #[tokio::test] - async fn test_json_array_format_with_list() -> Result<()> { - let session = SessionContext::new(); - let ctx = session.state(); - let task_ctx = ctx.task_ctx(); - let store = Arc::new(LocalFileSystem::new()) as _; - - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/list.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[ - {"id": 1, "tags": ["a", "b", "c"]}, - {"id": 2, "tags": ["d", "e"]}, - {"id": 3, "tags": ["f"]} - ]"#, - )?; - - let format = JsonFormat::default().with_newline_delimited(false); - let file_schema = format - .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) - .await?; - - // Verify list type in schema - let tags_field = file_schema.field_with_name("tags").unwrap(); - assert!(matches!(tags_field.data_type(), DataType::List(_))); - - let exec = scan_format( - &ctx, - &format, - Some(file_schema), - tmp_dir.path().to_str().unwrap(), - "list.json", - None, - None, + async fn test_json_array_unnest_struct() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "orders": [{"product": "A", "qty": 2}, {"product": "B", "qty": 3}]}, {"id": 2, "orders": [{"product": "C", "qty": 1}]}]"#, + "SELECT id, unnest(orders)['product'] as product, unnest(orders)['qty'] as qty FROM test_table ORDER BY id, product", ) - .await?; - let batches = collect(exec, task_ctx).await?; - - assert_eq!(1, batches.len()); - assert_eq!(3, batches[0].num_rows()); - - Ok(()) - } - - #[tokio::test] - async fn test_json_array_format_with_list_of_structs() -> Result<()> { - let ctx = SessionContext::new(); - - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/list_struct.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[ - {"id": 1, "items": [{"name": "item1", "price": 10.5}, {"name": "item2", "price": 20.0}]}, - {"id": 2, "items": [{"name": "item3", "price": 15.0}]}, - {"id": 3, "items": []} - ]"#, - )?; - - let options = NdJsonReadOptions::default().newline_delimited(false); - ctx.register_json("list_struct_table", &path, options) - .await?; - - // Query nested struct fields - let result = ctx - .sql("SELECT id, items FROM list_struct_table ORDER BY id") - .await? - .collect() .await?; - assert_eq!(1, result.len()); - assert_eq!(3, result[0].num_rows()); - + assert_snapshot!(result, @r" + +----+---------+-----+ + | id | product | qty | + +----+---------+-----+ + | 1 | A | 2 | + | 1 | B | 3 | + | 2 | C | 1 | + +----+---------+-----+ + "); Ok(()) } #[tokio::test] - async fn test_json_array_format_with_unnest() -> Result<()> { - let ctx = SessionContext::new(); - - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/unnest.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[ - {"id": 1, "values": [10, 20, 30]}, - {"id": 2, "values": [40, 50]}, - {"id": 3, "values": [60]} - ]"#, - )?; - - let options = NdJsonReadOptions::default().newline_delimited(false); - ctx.register_json("unnest_table", &path, options).await?; - - // Test UNNEST on array column - let result = ctx - .sql( - "SELECT id, unnest(values) as value FROM unnest_table ORDER BY id, value", - ) - .await? - .collect() + async fn test_json_array_nested_struct_access() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "dept": {"name": "Engineering", "head": "Alice"}}, {"id": 2, "dept": {"name": "Sales", "head": "Bob"}}]"#, + "SELECT id, dept['name'] as dept_name, dept['head'] as head FROM test_table ORDER BY id", + ) .await?; - assert_snapshot!(batches_to_string(&result), @r" - +----+-------+ - | id | value | - +----+-------+ - | 1 | 10 | - | 1 | 20 | - | 1 | 30 | - | 2 | 40 | - | 2 | 50 | - | 3 | 60 | - +----+-------+ - "); - + assert_snapshot!(result, @r" + +----+-------------+-------+ + | id | dept_name | head | + +----+-------------+-------+ + | 1 | Engineering | Alice | + | 2 | Sales | Bob | + +----+-------------+-------+ + "); Ok(()) } #[tokio::test] - async fn test_json_array_format_with_unnest_struct() -> Result<()> { - let ctx = SessionContext::new(); + async fn test_json_array_with_compression() -> Result<()> { + use flate2::Compression; + use flate2::write::GzEncoder; + use std::io::Write; let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/unnest_struct.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[{"id": 1, "orders": [{"product": "A", "qty": 2}, {"product": "B", "qty": 3}]}, {"id": 2, "orders": [{"product": "C", "qty": 1}]}]"#, - )?; - - let options = NdJsonReadOptions::default().newline_delimited(false); - ctx.register_json("unnest_struct_table", &path, options) - .await?; - - // Test UNNEST on List column and access struct fields - let result = ctx - .sql( - "SELECT id, unnest(orders)['product'] as product, unnest(orders)['qty'] as qty - FROM unnest_struct_table - ORDER BY id, product" - ) - .await? - .collect() - .await?; - - assert_snapshot!(batches_to_string(&result), @r" - +----+---------+-----+ - | id | product | qty | - +----+---------+-----+ - | 1 | A | 2 | - | 1 | B | 3 | - | 2 | C | 1 | - +----+---------+-----+ - "); - - Ok(()) - } - - #[tokio::test] - async fn test_json_array_format_deeply_nested() -> Result<()> { - let ctx = SessionContext::new(); + let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy()); - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/deep_nested.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[{"id": 1, "department": {"name": "Engineering", "head": "Alice"}}, {"id": 2, "department": {"name": "Sales", "head": "Bob"}}]"#, + let file = std::fs::File::create(&path)?; + let mut encoder = GzEncoder::new(file, Compression::default()); + encoder.write_all( + r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#.as_bytes(), )?; + encoder.finish()?; - let options = NdJsonReadOptions::default().newline_delimited(false); - ctx.register_json("deep_nested_table", &path, options) - .await?; + let ctx = SessionContext::new(); + let options = NdJsonReadOptions::default() + .newline_delimited(false) + .file_compression_type(FileCompressionType::GZIP) + .file_extension(".json.gz"); - // Query nested struct data + ctx.register_json("test_table", &path, options).await?; let result = ctx - .sql("SELECT id, department['name'] as dept_name, department['head'] as dept_head FROM deep_nested_table ORDER BY id") + .sql("SELECT a, b FROM test_table ORDER BY a") .await? .collect() .await?; assert_snapshot!(batches_to_string(&result), @r" - +----+-------------+-----------+ - | id | dept_name | dept_head | - +----+-------------+-----------+ - | 1 | Engineering | Alice | - | 2 | Sales | Bob | - +----+-------------+-----------+ - "); - + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + +---+-------+ + "); Ok(()) } #[tokio::test] - async fn test_json_array_format_with_null_values() -> Result<()> { - let ctx = SessionContext::new(); - - let tmp_dir = tempfile::TempDir::new()?; - let path = format!("{}/nulls.json", tmp_dir.path().to_string_lossy()); - std::fs::write( - &path, - r#"[ - {"id": 1, "name": "Alice", "score": 100}, - {"id": 2, "name": null, "score": 85}, - {"id": 3, "name": "Charlie", "score": null} - ]"#, - )?; - - let options = NdJsonReadOptions::default().newline_delimited(false); - ctx.register_json("null_table", &path, options).await?; - - let result = ctx - .sql("SELECT id, name, score FROM null_table ORDER BY id") - .await? - .collect() + async fn test_json_array_list_of_structs() -> Result<()> { + let batches = query_json_array( + r#"[{"id": 1, "items": [{"name": "x", "price": 10.5}]}, {"id": 2, "items": []}]"#, + "SELECT id, items FROM test_table ORDER BY id", + ) .await?; - assert_snapshot!(batches_to_string(&result), @r" - +----+---------+-------+ - | id | name | score | - +----+---------+-------+ - | 1 | Alice | 100 | - | 2 | | 85 | - | 3 | Charlie | | - +----+---------+-------+ - "); - + assert_eq!(1, batches.len()); + assert_eq!(2, batches[0].num_rows()); Ok(()) } }