diff --git a/Cargo.lock b/Cargo.lock index 3dc276d7c2310..fa27e6977c624 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2033,6 +2033,7 @@ dependencies = [ "datafusion-session", "futures", "object_store", + "serde_json", "tokio", ] diff --git a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs index 347f1a0464716..27acf508563ba 100644 --- a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs +++ b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs @@ -125,6 +125,7 @@ async fn json_opener() -> Result<()> { projected, FileCompressionType::UNCOMPRESSED, Arc::new(object_store), + false, ); let scan_config = FileScanConfigBuilder::new( diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 87344914d2f7e..8f2a07a83385c 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -3065,6 +3065,22 @@ config_namespace! { /// If not specified, the default level for the compression algorithm is used. pub compression_level: Option, default = None pub schema_infer_max_rec: Option, default = None + /// The format of JSON input files. + /// + /// When `false` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `true`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub format_array: bool, default = false } } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index cb2e9d787ee92..35b3acb7a603d 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -47,6 +47,7 @@ mod tests { use datafusion_common::stats::Precision; use datafusion_common::Result; + use datafusion_datasource::file_compression_type::FileCompressionType; use futures::StreamExt; use insta::assert_snapshot; use object_store::local::LocalFileSystem; @@ -391,4 +392,276 @@ mod tests { assert_eq!(metadata.len(), 0); Ok(()) } + + #[tokio::test] + async fn test_json_array_format() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let store = Arc::new(LocalFileSystem::new()) as _; + + // Create a temporary file with JSON array format + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"a": 1, "b": 2.0, "c": true}, + {"a": 2, "b": 3.5, "c": false}, + {"a": 3, "b": 4.0, "c": true} + ]"#, + )?; + + // Test with format_array = true + let format = JsonFormat::default().with_format_array(true); + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await + .expect("Schema inference"); + + let fields = file_schema + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect::>(); + assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_empty() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let store = Arc::new(LocalFileSystem::new()) as _; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/empty_array.json", tmp_dir.path().to_string_lossy()); + std::fs::write(&path, "[]")?; + + let format = JsonFormat::default().with_format_array(true); + let result = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await; + + assert!(result.is_err()); + assert!( + result + .unwrap_err() + .to_string() + .contains("JSON array is empty") + ); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_with_limit() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let store = Arc::new(LocalFileSystem::new()) as _; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array_limit.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"a": 1}, + {"a": 2, "b": "extra"} + ]"#, + )?; + + // Only infer from first record + let format = JsonFormat::default() + .with_format_array(true) + .with_schema_infer_max_rec(1); + + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await + .expect("Schema inference"); + + // Should only have field "a" since we limited to 1 record + let fields = file_schema + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect::>(); + assert_eq!(vec!["a: Int64"], fields); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_read_data() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let task_ctx = ctx.task_ctx(); + let store = Arc::new(LocalFileSystem::new()) as _; + + // Create a temporary file with JSON array format + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"a": 1, "b": 2.0, "c": true}, + {"a": 2, "b": 3.5, "c": false}, + {"a": 3, "b": 4.0, "c": true} + ]"#, + )?; + + let format = JsonFormat::default().with_format_array(true); + + // Infer schema + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await?; + + // Scan and read data + let exec = scan_format( + &ctx, + &format, + Some(file_schema), + tmp_dir.path().to_str().unwrap(), + "array.json", + None, + None, + ) + .await?; + let batches = collect(exec, task_ctx).await?; + + assert_eq!(1, batches.len()); + assert_eq!(3, batches[0].num_columns()); + assert_eq!(3, batches[0].num_rows()); + + // Verify data + let array_a = as_int64_array(batches[0].column(0))?; + assert_eq!( + vec![1, 2, 3], + (0..3).map(|i| array_a.value(i)).collect::>() + ); + + Ok(()) + } + + #[tokio::test] + async fn test_json_array_format_with_projection() -> Result<()> { + let session = SessionContext::new(); + let ctx = session.state(); + let task_ctx = ctx.task_ctx(); + let store = Arc::new(LocalFileSystem::new()) as _; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); + std::fs::write(&path, r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#)?; + + let format = JsonFormat::default().with_format_array(true); + let file_schema = format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await?; + + // Project only column "a" + let exec = scan_format( + &ctx, + &format, + Some(file_schema), + tmp_dir.path().to_str().unwrap(), + "array.json", + Some(vec![0]), + None, + ) + .await?; + let batches = collect(exec, task_ctx).await?; + + assert_eq!(1, batches.len()); + assert_eq!(1, batches[0].num_columns()); // Only 1 column projected + assert_eq!(2, batches[0].num_rows()); + + Ok(()) + } + + #[tokio::test] + async fn test_ndjson_read_options_format_array() -> Result<()> { + let ctx = SessionContext::new(); + + // Create a temporary file with JSON array format + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json", tmp_dir.path().to_string_lossy()); + std::fs::write( + &path, + r#"[ + {"a": 1, "b": "hello"}, + {"a": 2, "b": "world"}, + {"a": 3, "b": "test"} + ]"#, + )?; + + // Use NdJsonReadOptions with format_array = true + let options = NdJsonReadOptions::default().format_array(true); + + ctx.register_json("json_array_table", &path, options) + .await?; + + let result = ctx + .sql("SELECT a, b FROM json_array_table ORDER BY a") + .await? + .collect() + .await?; + + assert_snapshot!(batches_to_string(&result), @r" + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + | 3 | test | + +---+-------+ + "); + + Ok(()) + } + + #[tokio::test] + async fn test_ndjson_read_options_format_array_with_compression() -> Result<()> { + use flate2::Compression; + use flate2::write::GzEncoder; + use std::io::Write; + + let ctx = SessionContext::new(); + + // Create a temporary gzip compressed JSON array file + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy()); + + let json_content = r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#; + let file = std::fs::File::create(&path)?; + let mut encoder = GzEncoder::new(file, Compression::default()); + encoder.write_all(json_content.as_bytes())?; + encoder.finish()?; + + // Use NdJsonReadOptions with format_array and GZIP compression + let options = NdJsonReadOptions::default() + .format_array(true) + .file_compression_type(FileCompressionType::GZIP) + .file_extension(".json.gz"); + + ctx.register_json("json_array_gzip", &path, options).await?; + + let result = ctx + .sql("SELECT a, b FROM json_array_gzip ORDER BY a") + .await? + .collect() + .await?; + + assert_snapshot!(batches_to_string(&result), @r" + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + +---+-------+ + "); + + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 146c5f6f5fd0f..8fa9ef28d78bc 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -465,6 +465,9 @@ pub struct NdJsonReadOptions<'a> { pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, + /// Whether the JSON file is in array format `[{...}, {...}]` instead of + /// line-delimited format. Defaults to `false`. + pub format_array: bool, } impl Default for NdJsonReadOptions<'_> { @@ -477,6 +480,7 @@ impl Default for NdJsonReadOptions<'_> { file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, file_sort_order: vec![], + format_array: false, } } } @@ -529,6 +533,13 @@ impl<'a> NdJsonReadOptions<'a> { self.schema_infer_max_records = schema_infer_max_records; self } + + /// Specify whether the JSON file is in array format `[{...}, {...}]` + /// instead of line-delimited format. + pub fn format_array(mut self, format_array: bool) -> Self { + self.format_array = format_array; + self + } } #[async_trait] @@ -663,7 +674,8 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { let file_format = JsonFormat::default() .with_options(table_options.json) .with_schema_infer_max_rec(self.schema_infer_max_records) - .with_file_compression_type(self.file_compression_type.to_owned()); + .with_file_compression_type(self.file_compression_type.to_owned()) + .with_format_array(self.format_array); ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) diff --git a/datafusion/core/tests/data/json_array.json b/datafusion/core/tests/data/json_array.json new file mode 100644 index 0000000000000..1a8716dbf4beb --- /dev/null +++ b/datafusion/core/tests/data/json_array.json @@ -0,0 +1,5 @@ +[ + {"a": 1, "b": "hello"}, + {"a": 2, "b": "world"}, + {"a": 3, "b": "test"} +] diff --git a/datafusion/core/tests/data/json_empty_array.json b/datafusion/core/tests/data/json_empty_array.json new file mode 100644 index 0000000000000..fe51488c7066f --- /dev/null +++ b/datafusion/core/tests/data/json_empty_array.json @@ -0,0 +1 @@ +[] diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index 37fa8d43a0816..168ae8880eee7 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -44,6 +44,7 @@ datafusion-physical-plan = { workspace = true } datafusion-session = { workspace = true } futures = { workspace = true } object_store = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true } # Note: add additional linter rules in lib.rs. diff --git a/datafusion/datasource-json/src/file_format.rs b/datafusion/datasource-json/src/file_format.rs index a14458b5acd36..f6b258eb7c078 100644 --- a/datafusion/datasource-json/src/file_format.rs +++ b/datafusion/datasource-json/src/file_format.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions +//! [`JsonFormat`]: Line delimited and array JSON [`FileFormat`] abstractions use std::any::Any; use std::collections::HashMap; use std::fmt; use std::fmt::Debug; -use std::io::BufReader; +use std::io::{BufReader, Read}; use std::sync::Arc; use crate::source::JsonSource; @@ -48,6 +48,7 @@ use datafusion_datasource::file_format::{ use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; +use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::write::BatchSerializer; use datafusion_datasource::write::demux::DemuxedStreamReceiver; use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join; @@ -59,7 +60,6 @@ use datafusion_session::Session; use async_trait::async_trait; use bytes::{Buf, Bytes}; -use datafusion_datasource::source::DataSourceExec; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; #[derive(Default)] @@ -132,7 +132,23 @@ impl Debug for JsonFormatFactory { } } -/// New line delimited JSON `FileFormat` implementation. +/// JSON `FileFormat` implementation supporting both line-delimited and array formats. +/// +/// # Supported Formats +/// +/// ## Line-Delimited JSON (default) +/// ```text +/// {"key1": 1, "key2": "val"} +/// {"key1": 2, "key2": "vals"} +/// ``` +/// +/// ## JSON Array Format (when `format_array` option is true) +/// ```text +/// [ +/// {"key1": 1, "key2": "val"}, +/// {"key1": 2, "key2": "vals"} +/// ] +/// ``` #[derive(Debug, Default)] pub struct JsonFormat { options: JsonOptions, @@ -166,6 +182,49 @@ impl JsonFormat { self.options.compression = file_compression_type.into(); self } + + /// Set whether to expect JSON array format instead of line-delimited format. + /// + /// When `true`, expects input like: `[{"a": 1}, {"a": 2}]` + /// When `false` (default), expects input like: + /// ```text + /// {"a": 1} + /// {"a": 2} + /// ``` + pub fn with_format_array(mut self, format_array: bool) -> Self { + self.options.format_array = format_array; + self + } +} + +/// Infer schema from a JSON array format file. +/// +/// This function reads JSON data in array format `[{...}, {...}]` and infers +/// the Arrow schema from the contained objects. +fn infer_json_schema_from_json_array( + reader: &mut R, + max_records: usize, +) -> std::result::Result { + let mut content = String::new(); + reader.read_to_string(&mut content).map_err(|e| { + ArrowError::JsonError(format!("Failed to read JSON content: {e}")) + })?; + + // Parse as JSON array using serde_json + let values: Vec = serde_json::from_str(&content) + .map_err(|e| ArrowError::JsonError(format!("Failed to parse JSON array: {e}")))?; + + // Take only max_records for schema inference + let values_to_infer: Vec<_> = values.into_iter().take(max_records).collect(); + + if values_to_infer.is_empty() { + return Err(ArrowError::JsonError( + "JSON array is empty, cannot infer schema".to_string(), + )); + } + + // Use arrow's schema inference on the parsed values + infer_json_schema_from_iterator(values_to_infer.into_iter().map(Ok)) } #[async_trait] @@ -202,6 +261,8 @@ impl FileFormat for JsonFormat { .schema_infer_max_rec .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD); let file_compression_type = FileCompressionType::from(self.options.compression); + let is_array_format = self.options.format_array; + for object in objects { let mut take_while = || { let should_take = records_to_read > 0; @@ -217,15 +278,29 @@ impl FileFormat for JsonFormat { GetResultPayload::File(file, _) => { let decoder = file_compression_type.convert_read(file)?; let mut reader = BufReader::new(decoder); - let iter = ValueIter::new(&mut reader, None); - infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? + + if is_array_format { + infer_json_schema_from_json_array(&mut reader, records_to_read)? + } else { + let iter = ValueIter::new(&mut reader, None); + infer_json_schema_from_iterator( + iter.take_while(|_| take_while()), + )? + } } GetResultPayload::Stream(_) => { let data = r.bytes().await?; let decoder = file_compression_type.convert_read(data.reader())?; let mut reader = BufReader::new(decoder); - let iter = ValueIter::new(&mut reader, None); - infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? + + if is_array_format { + infer_json_schema_from_json_array(&mut reader, records_to_read)? + } else { + let iter = ValueIter::new(&mut reader, None); + infer_json_schema_from_iterator( + iter.take_while(|_| take_while()), + )? + } } }; @@ -281,7 +356,9 @@ impl FileFormat for JsonFormat { } fn file_source(&self, table_schema: TableSchema) -> Arc { - Arc::new(JsonSource::new(table_schema)) + Arc::new( + JsonSource::new(table_schema).with_format_array(self.options.format_array), + ) } } diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 5797054f11b9c..429e4a189aed4 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -36,6 +36,7 @@ use datafusion_datasource::{ use datafusion_physical_plan::projection::ProjectionExprs; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use arrow::array::RecordBatch; use arrow::json::ReaderBuilder; use arrow::{datatypes::SchemaRef, json}; use datafusion_datasource::file::FileSource; @@ -54,6 +55,7 @@ pub struct JsonOpener { projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, + format_array: bool, } impl JsonOpener { @@ -63,12 +65,14 @@ impl JsonOpener { projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, + format_array: bool, ) -> Self { Self { batch_size, projected_schema, file_compression_type, object_store, + format_array, } } } @@ -80,6 +84,7 @@ pub struct JsonSource { batch_size: Option, metrics: ExecutionPlanMetricsSet, projection: SplitProjection, + format_array: bool, } impl JsonSource { @@ -91,8 +96,15 @@ impl JsonSource { table_schema, batch_size: None, metrics: ExecutionPlanMetricsSet::new(), + format_array: false, } } + + /// Set whether to expect JSON array format + pub fn with_format_array(mut self, format_array: bool) -> Self { + self.format_array = format_array; + self + } } impl From for Arc { @@ -120,6 +132,7 @@ impl FileSource for JsonSource { projected_schema, file_compression_type: base_config.file_compression_type, object_store, + format_array: self.format_array, }) as Arc; // Wrap with ProjectionOpener @@ -186,6 +199,16 @@ impl FileOpener for JsonOpener { let schema = Arc::clone(&self.projected_schema); let batch_size = self.batch_size; let file_compression_type = self.file_compression_type.to_owned(); + let format_array = self.format_array; + + // JSON array format requires reading the complete file + if format_array && partitioned_file.range.is_some() { + return Err(DataFusionError::NotImplemented( + "JSON array format does not support range-based file scanning. \ + Disable repartition_file_scans or use line-delimited JSON format." + .to_string(), + )); + } Ok(Box::pin(async move { let calculated_range = @@ -222,33 +245,94 @@ impl FileOpener for JsonOpener { } }; - let reader = ReaderBuilder::new(schema) - .with_batch_size(batch_size) - .build(BufReader::new(bytes))?; - - Ok(futures::stream::iter(reader) - .map(|r| r.map_err(Into::into)) - .boxed()) + if format_array { + // Handle JSON array format + let batches = read_json_array_to_batches( + BufReader::new(bytes), + schema, + batch_size, + )?; + Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed()) + } else { + let reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(BufReader::new(bytes))?; + Ok(futures::stream::iter(reader) + .map(|r| r.map_err(Into::into)) + .boxed()) + } } GetResultPayload::Stream(s) => { - let s = s.map_err(DataFusionError::from); - - let decoder = ReaderBuilder::new(schema) - .with_batch_size(batch_size) - .build_decoder()?; - let input = file_compression_type.convert_stream(s.boxed())?.fuse(); - - let stream = deserialize_stream( - input, - DecoderDeserializer::new(JsonDecoder::new(decoder)), - ); - Ok(stream.map_err(Into::into).boxed()) + if format_array { + // For streaming, we need to collect all bytes first + let bytes = s + .map_err(DataFusionError::from) + .try_fold(Vec::new(), |mut acc, chunk| async move { + acc.extend_from_slice(&chunk); + Ok(acc) + }) + .await?; + let decompressed = file_compression_type + .convert_read(std::io::Cursor::new(bytes))?; + let batches = read_json_array_to_batches( + BufReader::new(decompressed), + schema, + batch_size, + )?; + Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed()) + } else { + let s = s.map_err(DataFusionError::from); + let decoder = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build_decoder()?; + let input = + file_compression_type.convert_stream(s.boxed())?.fuse(); + let stream = deserialize_stream( + input, + DecoderDeserializer::new(JsonDecoder::new(decoder)), + ); + Ok(stream.map_err(Into::into).boxed()) + } } } })) } } +/// Read JSON array format and convert to RecordBatches +fn read_json_array_to_batches( + mut reader: R, + schema: SchemaRef, + batch_size: usize, +) -> Result> { + use arrow::json::ReaderBuilder; + + let mut content = String::new(); + reader.read_to_string(&mut content)?; + + // Parse JSON array + let values: Vec = serde_json::from_str(&content) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + if values.is_empty() { + return Ok(vec![RecordBatch::new_empty(schema)]); + } + + // Convert to NDJSON string for arrow-json reader + let ndjson: String = values + .iter() + .map(|v| v.to_string()) + .collect::>() + .join("\n"); + + let cursor = std::io::Cursor::new(ndjson); + let reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(cursor)?; + + reader.collect::, _>>().map_err(Into::into) +} + pub async fn plan_to_json( task_ctx: Arc, plan: Arc, diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 08bb25bd715b9..eb8b1664744c0 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -469,6 +469,7 @@ message JsonOptions { CompressionTypeVariant compression = 1; // Compression type optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference optional uint32 compression_level = 3; // Optional compression level + bool format_array = 4; // Whether the JSON is in array format [{},...] (default false = line-delimited) } message TableParquetOptions { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 3c41b8cad9ed1..27acece6d3fd9 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1105,6 +1105,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions { compression: compression.into(), compression_level: proto_opts.compression_level, schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), + format_array: proto_opts.format_array, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index ef0eae1981d93..352199c6693f9 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4589,6 +4589,9 @@ impl serde::Serialize for JsonOptions { if self.compression_level.is_some() { len += 1; } + if self.format_array { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.JsonOptions", len)?; if self.compression != 0 { let v = CompressionTypeVariant::try_from(self.compression) @@ -4603,6 +4606,9 @@ impl serde::Serialize for JsonOptions { if let Some(v) = self.compression_level.as_ref() { struct_ser.serialize_field("compressionLevel", v)?; } + if self.format_array { + struct_ser.serialize_field("formatArray", &self.format_array)?; + } struct_ser.end() } } @@ -4618,6 +4624,8 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { "schemaInferMaxRec", "compression_level", "compressionLevel", + "format_array", + "formatArray", ]; #[allow(clippy::enum_variant_names)] @@ -4625,6 +4633,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { Compression, SchemaInferMaxRec, CompressionLevel, + FormatArray, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -4649,6 +4658,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { "compression" => Ok(GeneratedField::Compression), "schemaInferMaxRec" | "schema_infer_max_rec" => Ok(GeneratedField::SchemaInferMaxRec), "compressionLevel" | "compression_level" => Ok(GeneratedField::CompressionLevel), + "formatArray" | "format_array" => Ok(GeneratedField::FormatArray), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -4671,6 +4681,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { let mut compression__ = None; let mut schema_infer_max_rec__ = None; let mut compression_level__ = None; + let mut format_array__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Compression => { @@ -4695,12 +4706,19 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } + GeneratedField::FormatArray => { + if format_array__.is_some() { + return Err(serde::de::Error::duplicate_field("formatArray")); + } + format_array__ = Some(map_.next_value()?); + } } } Ok(JsonOptions { compression: compression__.unwrap_or_default(), schema_infer_max_rec: schema_infer_max_rec__, compression_level: compression_level__, + format_array: format_array__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 16601dcf46977..afdb6628225b2 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -665,6 +665,9 @@ pub struct JsonOptions { /// Optional compression level #[prost(uint32, optional, tag = "3")] pub compression_level: ::core::option::Option, + /// Whether the JSON is in array format \[{},...\] (default false = line-delimited) + #[prost(bool, tag = "4")] + pub format_array: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index fee3656482005..ef7563230f49d 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -990,6 +990,7 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions { compression: compression.into(), schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64), compression_level: opts.compression_level, + format_array: opts.format_array, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 16601dcf46977..afdb6628225b2 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -665,6 +665,9 @@ pub struct JsonOptions { /// Optional compression level #[prost(uint32, optional, tag = "3")] pub compression_level: ::core::option::Option, + /// Whether the JSON is in array format \[{},...\] (default false = line-delimited) + #[prost(bool, tag = "4")] + pub format_array: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 436a06493766d..e753e34462d86 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -241,6 +241,7 @@ impl JsonOptionsProto { compression: options.compression as i32, schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64), compression_level: options.compression_level, + format_array: options.format_array, } } else { JsonOptionsProto::default() @@ -260,6 +261,7 @@ impl From<&JsonOptionsProto> for JsonOptions { }, schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize), compression_level: proto.compression_level, + format_array: proto.format_array, } } } diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index b46b8c49d6623..4442a6a2d5af2 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -146,3 +146,31 @@ EXPLAIN SELECT id FROM json_partitioned_test WHERE part = 2 ---- logical_plan TableScan: json_partitioned_test projection=[id], full_filters=[json_partitioned_test.part = Int32(2)] physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_json/part=2/data.json]]}, projection=[id], file_type=json + +########## +## JSON Array Format Tests +########## + +# Test reading JSON array format file with format_array=true +statement ok +CREATE EXTERNAL TABLE json_array_test +STORED AS JSON +OPTIONS ('format.format_array' 'true') +LOCATION '../core/tests/data/json_array.json'; + +query IT rowsort +SELECT a, b FROM json_array_test +---- +1 hello +2 world +3 test + +statement ok +DROP TABLE json_array_test; + +# Test that reading JSON array format WITHOUT format_array option fails +# (default is line-delimited mode which can't parse array format correctly) +statement error Not valid JSON +CREATE EXTERNAL TABLE json_array_as_ndjson +STORED AS JSON +LOCATION '../core/tests/data/json_array.json'; \ No newline at end of file