diff --git a/Cargo.lock b/Cargo.lock index 63d32357b8f20..9561eb3fcf60f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2035,6 +2035,7 @@ dependencies = [ "datafusion-session", "futures", "object_store", + "serde_json", "tokio", ] diff --git a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs index 347f1a0464716..27acf508563ba 100644 --- a/datafusion-examples/examples/custom_data_source/csv_json_opener.rs +++ b/datafusion-examples/examples/custom_data_source/csv_json_opener.rs @@ -125,6 +125,7 @@ async fn json_opener() -> Result<()> { projected, FileCompressionType::UNCOMPRESSED, Arc::new(object_store), + false, ); let scan_config = FileScanConfigBuilder::new( diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 4860393cab2b7..ba9ed78d7c735 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -3065,6 +3065,25 @@ config_namespace! { /// If not specified, the default level for the compression algorithm is used. pub compression_level: Option, default = None pub schema_infer_max_rec: Option, default = None + /// The JSON format to use when reading files. + /// + /// When `true` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + /// + /// Note: JSON array format requires loading the entire file into memory. + /// For large files, newline-delimited format is recommended. + pub newline_delimited: bool, default = true } } diff --git a/datafusion/core/src/datasource/file_format/json.rs b/datafusion/core/src/datasource/file_format/json.rs index cb2e9d787ee92..648f100796a62 100644 --- a/datafusion/core/src/datasource/file_format/json.rs +++ b/datafusion/core/src/datasource/file_format/json.rs @@ -47,12 +47,54 @@ mod tests { use datafusion_common::stats::Precision; use datafusion_common::Result; + use datafusion_datasource::file_compression_type::FileCompressionType; use futures::StreamExt; use insta::assert_snapshot; use object_store::local::LocalFileSystem; use regex::Regex; use rstest::rstest; + // ==================== Test Helpers ==================== + + /// Create a temporary JSON file and return (TempDir, path) + fn create_temp_json(content: &str) -> (tempfile::TempDir, String) { + let tmp_dir = tempfile::TempDir::new().unwrap(); + let path = format!("{}/test.json", tmp_dir.path().to_string_lossy()); + std::fs::write(&path, content).unwrap(); + (tmp_dir, path) + } + + /// Infer schema from JSON array format file + async fn infer_json_array_schema( + content: &str, + ) -> Result { + let (_tmp_dir, path) = create_temp_json(content); + let session = SessionContext::new(); + let ctx = session.state(); + let store = Arc::new(LocalFileSystem::new()) as _; + let format = JsonFormat::default().with_newline_delimited(false); + format + .infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)]) + .await + } + + /// Register a JSON array table and run a query + async fn query_json_array(content: &str, query: &str) -> Result> { + let (_tmp_dir, path) = create_temp_json(content); + let ctx = SessionContext::new(); + let options = NdJsonReadOptions::default().newline_delimited(false); + ctx.register_json("test_table", &path, options).await?; + ctx.sql(query).await?.collect().await + } + + /// Register a JSON array table and run a query, return formatted string + async fn query_json_array_str(content: &str, query: &str) -> Result { + let result = query_json_array(content, query).await?; + Ok(batches_to_string(&result)) + } + + // ==================== Existing Tests ==================== + #[tokio::test] async fn read_small_batches() -> Result<()> { let config = SessionConfig::new().with_batch_size(2); @@ -314,7 +356,6 @@ mod tests { .digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 }"#.into()); let mut all_batches = RecordBatch::new_empty(schema.clone()); - // We get RequiresMoreData after 2 batches because of how json::Decoder works for _ in 0..2 { let output = deserializer.next()?; let DeserializerOutput::RecordBatch(batch) = output else { @@ -358,7 +399,6 @@ mod tests { let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?; df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) .await?; - // Expected the file to exist and be empty assert!(std::path::Path::new(&path).exists()); let metadata = std::fs::metadata(&path)?; assert_eq!(metadata.len(), 0); @@ -385,10 +425,210 @@ mod tests { let df = ctx.read_batch(empty_batch.clone())?; df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None) .await?; - // Expected the file to exist and be empty assert!(std::path::Path::new(&path).exists()); let metadata = std::fs::metadata(&path)?; assert_eq!(metadata.len(), 0); Ok(()) } + + // ==================== JSON Array Format Tests ==================== + + #[tokio::test] + async fn test_json_array_schema_inference() -> Result<()> { + let schema = infer_json_array_schema( + r#"[{"a": 1, "b": 2.0, "c": true}, {"a": 2, "b": 3.5, "c": false}]"#, + ) + .await?; + + let fields: Vec<_> = schema + .fields() + .iter() + .map(|f| format!("{}: {:?}", f.name(), f.data_type())) + .collect(); + assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_empty() -> Result<()> { + let schema = infer_json_array_schema("[]").await?; + assert_eq!(schema.fields().len(), 0); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_nested_struct() -> Result<()> { + let schema = infer_json_array_schema( + r#"[{"id": 1, "info": {"name": "Alice", "age": 30}}]"#, + ) + .await?; + + let info_field = schema.field_with_name("info").unwrap(); + assert!(matches!(info_field.data_type(), DataType::Struct(_))); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_list_type() -> Result<()> { + let schema = + infer_json_array_schema(r#"[{"id": 1, "tags": ["a", "b", "c"]}]"#).await?; + + let tags_field = schema.field_with_name("tags").unwrap(); + assert!(matches!(tags_field.data_type(), DataType::List(_))); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_basic_query() -> Result<()> { + let result = query_json_array_str( + r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}, {"a": 3, "b": "test"}]"#, + "SELECT a, b FROM test_table ORDER BY a", + ) + .await?; + + assert_snapshot!(result, @r" + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + | 3 | test | + +---+-------+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_with_nulls() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "name": "Alice"}, {"id": 2, "name": null}, {"id": 3, "name": "Charlie"}]"#, + "SELECT id, name FROM test_table ORDER BY id", + ) + .await?; + + assert_snapshot!(result, @r" + +----+---------+ + | id | name | + +----+---------+ + | 1 | Alice | + | 2 | | + | 3 | Charlie | + +----+---------+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_unnest() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "values": [10, 20, 30]}, {"id": 2, "values": [40, 50]}]"#, + "SELECT id, unnest(values) as value FROM test_table ORDER BY id, value", + ) + .await?; + + assert_snapshot!(result, @r" + +----+-------+ + | id | value | + +----+-------+ + | 1 | 10 | + | 1 | 20 | + | 1 | 30 | + | 2 | 40 | + | 2 | 50 | + +----+-------+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_unnest_struct() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "orders": [{"product": "A", "qty": 2}, {"product": "B", "qty": 3}]}, {"id": 2, "orders": [{"product": "C", "qty": 1}]}]"#, + "SELECT id, unnest(orders)['product'] as product, unnest(orders)['qty'] as qty FROM test_table ORDER BY id, product", + ) + .await?; + + assert_snapshot!(result, @r" + +----+---------+-----+ + | id | product | qty | + +----+---------+-----+ + | 1 | A | 2 | + | 1 | B | 3 | + | 2 | C | 1 | + +----+---------+-----+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_nested_struct_access() -> Result<()> { + let result = query_json_array_str( + r#"[{"id": 1, "dept": {"name": "Engineering", "head": "Alice"}}, {"id": 2, "dept": {"name": "Sales", "head": "Bob"}}]"#, + "SELECT id, dept['name'] as dept_name, dept['head'] as head FROM test_table ORDER BY id", + ) + .await?; + + assert_snapshot!(result, @r" + +----+-------------+-------+ + | id | dept_name | head | + +----+-------------+-------+ + | 1 | Engineering | Alice | + | 2 | Sales | Bob | + +----+-------------+-------+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_with_compression() -> Result<()> { + use flate2::Compression; + use flate2::write::GzEncoder; + use std::io::Write; + + let tmp_dir = tempfile::TempDir::new()?; + let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy()); + + let file = std::fs::File::create(&path)?; + let mut encoder = GzEncoder::new(file, Compression::default()); + encoder.write_all( + r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#.as_bytes(), + )?; + encoder.finish()?; + + let ctx = SessionContext::new(); + let options = NdJsonReadOptions::default() + .newline_delimited(false) + .file_compression_type(FileCompressionType::GZIP) + .file_extension(".json.gz"); + + ctx.register_json("test_table", &path, options).await?; + let result = ctx + .sql("SELECT a, b FROM test_table ORDER BY a") + .await? + .collect() + .await?; + + assert_snapshot!(batches_to_string(&result), @r" + +---+-------+ + | a | b | + +---+-------+ + | 1 | hello | + | 2 | world | + +---+-------+ + "); + Ok(()) + } + + #[tokio::test] + async fn test_json_array_list_of_structs() -> Result<()> { + let batches = query_json_array( + r#"[{"id": 1, "items": [{"name": "x", "price": 10.5}]}, {"id": 2, "items": []}]"#, + "SELECT id, items FROM test_table ORDER BY id", + ) + .await?; + + assert_eq!(1, batches.len()); + assert_eq!(2, batches[0].num_rows()); + Ok(()) + } } diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs index 146c5f6f5fd0f..1798bca86b035 100644 --- a/datafusion/core/src/datasource/file_format/options.rs +++ b/datafusion/core/src/datasource/file_format/options.rs @@ -442,7 +442,9 @@ impl<'a> AvroReadOptions<'a> { } } -/// Options that control the reading of Line-delimited JSON files (NDJson) +/// Options that control the reading of JSON files. +/// +/// Supports both newline-delimited JSON (NDJSON) and JSON array formats. /// /// Note this structure is supplied when a datasource is created and /// can not not vary from statement to statement. For settings that @@ -465,6 +467,22 @@ pub struct NdJsonReadOptions<'a> { pub infinite: bool, /// Indicates how the file is sorted pub file_sort_order: Vec>, + /// Whether to read as newline-delimited JSON (default: true). + /// + /// When `true` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub newline_delimited: bool, } impl Default for NdJsonReadOptions<'_> { @@ -477,6 +495,7 @@ impl Default for NdJsonReadOptions<'_> { file_compression_type: FileCompressionType::UNCOMPRESSED, infinite: false, file_sort_order: vec![], + newline_delimited: true, } } } @@ -529,6 +548,26 @@ impl<'a> NdJsonReadOptions<'a> { self.schema_infer_max_records = schema_infer_max_records; self } + + /// Set whether to read as newline-delimited JSON. + /// + /// When `true` (default), expects newline-delimited JSON (NDJSON): + /// ```text + /// {"key1": 1, "key2": "val"} + /// {"key1": 2, "key2": "vals"} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [ + /// {"key1": 1, "key2": "val"}, + /// {"key1": 2, "key2": "vals"} + /// ] + /// ``` + pub fn newline_delimited(mut self, newline_delimited: bool) -> Self { + self.newline_delimited = newline_delimited; + self + } } #[async_trait] @@ -663,7 +702,8 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> { let file_format = JsonFormat::default() .with_options(table_options.json) .with_schema_infer_max_rec(self.schema_infer_max_records) - .with_file_compression_type(self.file_compression_type.to_owned()); + .with_file_compression_type(self.file_compression_type.to_owned()) + .with_newline_delimited(self.newline_delimited); ListingOptions::new(Arc::new(file_format)) .with_file_extension(self.file_extension) diff --git a/datafusion/core/tests/data/json_array.json b/datafusion/core/tests/data/json_array.json new file mode 100644 index 0000000000000..1a8716dbf4beb --- /dev/null +++ b/datafusion/core/tests/data/json_array.json @@ -0,0 +1,5 @@ +[ + {"a": 1, "b": "hello"}, + {"a": 2, "b": "world"}, + {"a": 3, "b": "test"} +] diff --git a/datafusion/core/tests/data/json_empty_array.json b/datafusion/core/tests/data/json_empty_array.json new file mode 100644 index 0000000000000..fe51488c7066f --- /dev/null +++ b/datafusion/core/tests/data/json_empty_array.json @@ -0,0 +1 @@ +[] diff --git a/datafusion/datasource-json/Cargo.toml b/datafusion/datasource-json/Cargo.toml index 37fa8d43a0816..168ae8880eee7 100644 --- a/datafusion/datasource-json/Cargo.toml +++ b/datafusion/datasource-json/Cargo.toml @@ -44,6 +44,7 @@ datafusion-physical-plan = { workspace = true } datafusion-session = { workspace = true } futures = { workspace = true } object_store = { workspace = true } +serde_json = { workspace = true } tokio = { workspace = true } # Note: add additional linter rules in lib.rs. diff --git a/datafusion/datasource-json/src/file_format.rs b/datafusion/datasource-json/src/file_format.rs index a14458b5acd36..46e67f5768286 100644 --- a/datafusion/datasource-json/src/file_format.rs +++ b/datafusion/datasource-json/src/file_format.rs @@ -15,13 +15,13 @@ // specific language governing permissions and limitations // under the License. -//! [`JsonFormat`]: Line delimited JSON [`FileFormat`] abstractions +//! [`JsonFormat`]: Line delimited and array JSON [`FileFormat`] abstractions use std::any::Any; use std::collections::HashMap; use std::fmt; use std::fmt::Debug; -use std::io::BufReader; +use std::io::{BufReader, Read}; use std::sync::Arc; use crate::source::JsonSource; @@ -30,11 +30,14 @@ use arrow::array::RecordBatch; use arrow::datatypes::{Schema, SchemaRef}; use arrow::error::ArrowError; use arrow::json; -use arrow::json::reader::{ValueIter, infer_json_schema_from_iterator}; +use arrow::json::reader::{ + ValueIter, infer_json_schema, infer_json_schema_from_iterator, +}; +use bytes::{Buf, Bytes}; use datafusion_common::config::{ConfigField, ConfigFileType, JsonOptions}; use datafusion_common::file_options::json_writer::JsonWriterOptions; use datafusion_common::{ - DEFAULT_JSON_EXTENSION, GetExt, Result, Statistics, not_impl_err, + DEFAULT_JSON_EXTENSION, DataFusionError, GetExt, Result, Statistics, not_impl_err, }; use datafusion_common_runtime::SpawnedTask; use datafusion_datasource::TableSchema; @@ -48,6 +51,7 @@ use datafusion_datasource::file_format::{ use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig}; use datafusion_datasource::sink::{DataSink, DataSinkExec}; +use datafusion_datasource::source::DataSourceExec; use datafusion_datasource::write::BatchSerializer; use datafusion_datasource::write::demux::DemuxedStreamReceiver; use datafusion_datasource::write::orchestration::spawn_writer_tasks_and_join; @@ -58,8 +62,6 @@ use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use datafusion_session::Session; use async_trait::async_trait; -use bytes::{Buf, Bytes}; -use datafusion_datasource::source::DataSourceExec; use object_store::{GetResultPayload, ObjectMeta, ObjectStore}; #[derive(Default)] @@ -132,7 +134,26 @@ impl Debug for JsonFormatFactory { } } -/// New line delimited JSON `FileFormat` implementation. +/// JSON `FileFormat` implementation supporting both line-delimited and array formats. +/// +/// # Supported Formats +/// +/// ## Line-Delimited JSON (default, `newline_delimited = true`) +/// ```text +/// {"key1": 1, "key2": "val"} +/// {"key1": 2, "key2": "vals"} +/// ``` +/// +/// ## JSON Array Format (`newline_delimited = false`) +/// ```text +/// [ +/// {"key1": 1, "key2": "val"}, +/// {"key1": 2, "key2": "vals"} +/// ] +/// ``` +/// +/// Note: JSON array format requires loading the entire file into memory, +/// which may not be suitable for very large files. #[derive(Debug, Default)] pub struct JsonFormat { options: JsonOptions, @@ -166,6 +187,107 @@ impl JsonFormat { self.options.compression = file_compression_type.into(); self } + + /// Set whether to read as newline-delimited JSON (NDJSON). + /// + /// When `true` (default), expects newline-delimited format: + /// ```text + /// {"a": 1} + /// {"a": 2} + /// ``` + /// + /// When `false`, expects JSON array format: + /// ```text + /// [{"a": 1}, {"a": 2}] + /// ``` + pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self { + self.options.newline_delimited = newline_delimited; + self + } + + /// Returns whether this format expects newline-delimited JSON. + pub fn is_newline_delimited(&self) -> bool { + self.options.newline_delimited + } +} + +/// Extract JSON records from array format using bracket tracking. +/// +/// This avoids full JSON parsing by only tracking brace depth to find +/// record boundaries. Much faster than serde_json::from_str() for large files. +fn extract_json_records(content: &str) -> Result> { + let content = content.trim(); + if !content.starts_with('[') || !content.ends_with(']') { + return Err(DataFusionError::Execution( + "JSON array format must start with '[' and end with ']'".to_string(), + )); + } + + // Remove outer brackets + let inner = &content[1..content.len() - 1]; + let mut records = Vec::new(); + let mut depth = 0; + let mut in_string = false; + let mut escape_next = false; + let mut record_start: Option = None; + + for (i, ch) in inner.char_indices() { + if escape_next { + escape_next = false; + continue; + } + + match ch { + '\\' if in_string => escape_next = true, + '"' => in_string = !in_string, + '{' if !in_string => { + if depth == 0 { + record_start = Some(i); + } + depth += 1; + } + '}' if !in_string => { + depth -= 1; + if depth == 0 + && let Some(start) = record_start + { + records.push(inner[start..=i].to_string()); + record_start = None; + } + } + _ => {} + } + } + + Ok(records) +} + +/// Infer schema from JSON array format content (synchronous version). +/// +/// This function extracts individual JSON records from array format +/// and uses arrow-json's schema inference on the extracted records. +fn infer_schema_from_json_array_content( + content: &str, + max_records: usize, +) -> Result { + let records = extract_json_records(content)?; + + let records_to_infer: Vec<&str> = records + .iter() + .take(max_records) + .map(|s| s.as_str()) + .collect(); + + if records_to_infer.is_empty() { + return Ok(Schema::empty()); + } + + // Create NDJSON string for arrow-json schema inference + let ndjson = records_to_infer.join("\n"); + let cursor = std::io::Cursor::new(ndjson.as_bytes()); + + let (schema, _) = infer_json_schema(cursor, Some(max_records))?; + Ok(schema) } #[async_trait] @@ -202,6 +324,8 @@ impl FileFormat for JsonFormat { .schema_infer_max_rec .unwrap_or(DEFAULT_SCHEMA_INFER_MAX_RECORD); let file_compression_type = FileCompressionType::from(self.options.compression); + let newline_delimited = self.options.newline_delimited; + for object in objects { let mut take_while = || { let should_take = records_to_read > 0; @@ -217,15 +341,35 @@ impl FileFormat for JsonFormat { GetResultPayload::File(file, _) => { let decoder = file_compression_type.convert_read(file)?; let mut reader = BufReader::new(decoder); - let iter = ValueIter::new(&mut reader, None); - infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? + + if newline_delimited { + let iter = ValueIter::new(&mut reader, None); + infer_json_schema_from_iterator( + iter.take_while(|_| take_while()), + )? + } else { + // JSON array format: read content and extract records + let mut content = String::new(); + reader.read_to_string(&mut content)?; + infer_schema_from_json_array_content(&content, records_to_read)? + } } GetResultPayload::Stream(_) => { let data = r.bytes().await?; let decoder = file_compression_type.convert_read(data.reader())?; let mut reader = BufReader::new(decoder); - let iter = ValueIter::new(&mut reader, None); - infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? + + if newline_delimited { + let iter = ValueIter::new(&mut reader, None); + infer_json_schema_from_iterator( + iter.take_while(|_| take_while()), + )? + } else { + // JSON array format: read content and extract records + let mut content = String::new(); + reader.read_to_string(&mut content)?; + infer_schema_from_json_array_content(&content, records_to_read)? + } } }; @@ -281,7 +425,10 @@ impl FileFormat for JsonFormat { } fn file_source(&self, table_schema: TableSchema) -> Arc { - Arc::new(JsonSource::new(table_schema)) + Arc::new( + JsonSource::new(table_schema) + .with_newline_delimited(self.options.newline_delimited), + ) } } diff --git a/datafusion/datasource-json/src/source.rs b/datafusion/datasource-json/src/source.rs index 5797054f11b9c..40113b0af9921 100644 --- a/datafusion/datasource-json/src/source.rs +++ b/datafusion/datasource-json/src/source.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -//! Execution plan for reading line-delimited JSON files +//! Execution plan for reading JSON files (line-delimited and array formats) use std::any::Any; use std::io::{BufReader, Read, Seek, SeekFrom}; @@ -36,6 +36,7 @@ use datafusion_datasource::{ use datafusion_physical_plan::projection::ProjectionExprs; use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use arrow::array::RecordBatch; use arrow::json::ReaderBuilder; use arrow::{datatypes::SchemaRef, json}; use datafusion_datasource::file::FileSource; @@ -54,21 +55,26 @@ pub struct JsonOpener { projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, + /// When `true` (default), expects newline-delimited JSON (NDJSON). + /// When `false`, expects JSON array format `[{...}, {...}]`. + newline_delimited: bool, } impl JsonOpener { - /// Returns a [`JsonOpener`] + /// Returns a [`JsonOpener`] pub fn new( batch_size: usize, projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, + newline_delimited: bool, ) -> Self { Self { batch_size, projected_schema, file_compression_type, object_store, + newline_delimited, } } } @@ -80,6 +86,9 @@ pub struct JsonSource { batch_size: Option, metrics: ExecutionPlanMetricsSet, projection: SplitProjection, + /// When `true` (default), expects newline-delimited JSON (NDJSON). + /// When `false`, expects JSON array format `[{...}, {...}]`. + newline_delimited: bool, } impl JsonSource { @@ -91,8 +100,18 @@ impl JsonSource { table_schema, batch_size: None, metrics: ExecutionPlanMetricsSet::new(), + newline_delimited: true, } } + + /// Set whether to read as newline-delimited JSON. + /// + /// When `true` (default), expects newline-delimited format. + /// When `false`, expects JSON array format `[{...}, {...}]`. + pub fn with_newline_delimited(mut self, newline_delimited: bool) -> Self { + self.newline_delimited = newline_delimited; + self + } } impl From for Arc { @@ -120,6 +139,7 @@ impl FileSource for JsonSource { projected_schema, file_compression_type: base_config.file_compression_type, object_store, + newline_delimited: self.newline_delimited, }) as Arc; // Wrap with ProjectionOpener @@ -172,7 +192,7 @@ impl FileSource for JsonSource { } impl FileOpener for JsonOpener { - /// Open a partitioned NDJSON file. + /// Open a partitioned JSON file. /// /// If `file_meta.range` is `None`, the entire file is opened. /// Else `file_meta.range` is `Some(FileRange{start, end})`, which corresponds to the byte range [start, end) within the file. @@ -181,11 +201,23 @@ impl FileOpener for JsonOpener { /// are applied to determine which lines to read: /// 1. The first line of the partition is the line in which the index of the first character >= `start`. /// 2. The last line of the partition is the line in which the byte at position `end - 1` resides. + /// + /// Note: JSON array format does not support range-based scanning. fn open(&self, partitioned_file: PartitionedFile) -> Result { let store = Arc::clone(&self.object_store); let schema = Arc::clone(&self.projected_schema); let batch_size = self.batch_size; let file_compression_type = self.file_compression_type.to_owned(); + let newline_delimited = self.newline_delimited; + + // JSON array format requires reading the complete file + if !newline_delimited && partitioned_file.range.is_some() { + return Err(DataFusionError::NotImplemented( + "JSON array format does not support range-based file scanning. \ + Disable repartition_file_scans or use newline-delimited JSON format." + .to_string(), + )); + } Ok(Box::pin(async move { let calculated_range = @@ -222,33 +254,97 @@ impl FileOpener for JsonOpener { } }; - let reader = ReaderBuilder::new(schema) - .with_batch_size(batch_size) - .build(BufReader::new(bytes))?; - - Ok(futures::stream::iter(reader) - .map(|r| r.map_err(Into::into)) - .boxed()) + if newline_delimited { + // Newline-delimited JSON (NDJSON) reader + let reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(BufReader::new(bytes))?; + Ok(futures::stream::iter(reader) + .map(|r| r.map_err(Into::into)) + .boxed()) + } else { + // JSON array format reader + let batches = read_json_array_to_batches( + BufReader::new(bytes), + schema, + batch_size, + )?; + Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed()) + } } GetResultPayload::Stream(s) => { - let s = s.map_err(DataFusionError::from); - - let decoder = ReaderBuilder::new(schema) - .with_batch_size(batch_size) - .build_decoder()?; - let input = file_compression_type.convert_stream(s.boxed())?.fuse(); - - let stream = deserialize_stream( - input, - DecoderDeserializer::new(JsonDecoder::new(decoder)), - ); - Ok(stream.map_err(Into::into).boxed()) + if newline_delimited { + // Newline-delimited JSON (NDJSON) streaming reader + let s = s.map_err(DataFusionError::from); + let decoder = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build_decoder()?; + let input = + file_compression_type.convert_stream(s.boxed())?.fuse(); + let stream = deserialize_stream( + input, + DecoderDeserializer::new(JsonDecoder::new(decoder)), + ); + Ok(stream.map_err(Into::into).boxed()) + } else { + // JSON array format: collect all bytes first + let bytes = s + .map_err(DataFusionError::from) + .try_fold(Vec::new(), |mut acc, chunk| async move { + acc.extend_from_slice(&chunk); + Ok(acc) + }) + .await?; + let decompressed = file_compression_type + .convert_read(std::io::Cursor::new(bytes))?; + let batches = read_json_array_to_batches( + BufReader::new(decompressed), + schema, + batch_size, + )?; + Ok(futures::stream::iter(batches.into_iter().map(Ok)).boxed()) + } } } })) } } +/// Read JSON array format and convert to RecordBatches. +/// +/// Parses a JSON array `[{...}, {...}, ...]` and converts each object +/// to Arrow RecordBatches using the provided schema. +fn read_json_array_to_batches( + mut reader: R, + schema: SchemaRef, + batch_size: usize, +) -> Result> { + let mut content = String::new(); + reader.read_to_string(&mut content)?; + + // Parse JSON array + let values: Vec = serde_json::from_str(&content) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + if values.is_empty() { + return Ok(vec![RecordBatch::new_empty(schema)]); + } + + // Convert to NDJSON string for arrow-json reader + let ndjson: String = values + .iter() + .map(|v| v.to_string()) + .collect::>() + .join("\n"); + + let cursor = std::io::Cursor::new(ndjson); + let reader = ReaderBuilder::new(schema) + .with_batch_size(batch_size) + .build(cursor)?; + + reader.collect::, _>>().map_err(Into::into) +} + pub async fn plan_to_json( task_ctx: Arc, plan: Arc, diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 08bb25bd715b9..fa6cb17152576 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -469,6 +469,7 @@ message JsonOptions { CompressionTypeVariant compression = 1; // Compression type optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference optional uint32 compression_level = 3; // Optional compression level + bool newline_delimited = 4; // Whether to read as newline-delimited JSON (default true). When false, expects JSON array format [{},...] } message TableParquetOptions { diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 3c41b8cad9ed1..b98c48b30ba95 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -1105,6 +1105,7 @@ impl TryFrom<&protobuf::JsonOptions> for JsonOptions { compression: compression.into(), compression_level: proto_opts.compression_level, schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), + newline_delimited: proto_opts.newline_delimited, }) } } diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index ef0eae1981d93..cf36bb335f4ca 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -4589,6 +4589,9 @@ impl serde::Serialize for JsonOptions { if self.compression_level.is_some() { len += 1; } + if self.newline_delimited { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion_common.JsonOptions", len)?; if self.compression != 0 { let v = CompressionTypeVariant::try_from(self.compression) @@ -4603,6 +4606,9 @@ impl serde::Serialize for JsonOptions { if let Some(v) = self.compression_level.as_ref() { struct_ser.serialize_field("compressionLevel", v)?; } + if self.newline_delimited { + struct_ser.serialize_field("newlineDelimited", &self.newline_delimited)?; + } struct_ser.end() } } @@ -4618,6 +4624,8 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { "schemaInferMaxRec", "compression_level", "compressionLevel", + "newline_delimited", + "newlineDelimited", ]; #[allow(clippy::enum_variant_names)] @@ -4625,6 +4633,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { Compression, SchemaInferMaxRec, CompressionLevel, + NewlineDelimited, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -4649,6 +4658,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { "compression" => Ok(GeneratedField::Compression), "schemaInferMaxRec" | "schema_infer_max_rec" => Ok(GeneratedField::SchemaInferMaxRec), "compressionLevel" | "compression_level" => Ok(GeneratedField::CompressionLevel), + "newlineDelimited" | "newline_delimited" => Ok(GeneratedField::NewlineDelimited), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -4671,6 +4681,7 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { let mut compression__ = None; let mut schema_infer_max_rec__ = None; let mut compression_level__ = None; + let mut newline_delimited__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Compression => { @@ -4695,12 +4706,19 @@ impl<'de> serde::Deserialize<'de> for JsonOptions { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } + GeneratedField::NewlineDelimited => { + if newline_delimited__.is_some() { + return Err(serde::de::Error::duplicate_field("newlineDelimited")); + } + newline_delimited__ = Some(map_.next_value()?); + } } } Ok(JsonOptions { compression: compression__.unwrap_or_default(), schema_infer_max_rec: schema_infer_max_rec__, compression_level: compression_level__, + newline_delimited: newline_delimited__.unwrap_or_default(), }) } } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 16601dcf46977..a1d4f5ce25425 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -665,6 +665,9 @@ pub struct JsonOptions { /// Optional compression level #[prost(uint32, optional, tag = "3")] pub compression_level: ::core::option::Option, + /// Whether to read as newline-delimited JSON (default true). When false, expects JSON array format \[{},...\] + #[prost(bool, tag = "4")] + pub newline_delimited: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index fee3656482005..ddb6da9848166 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -990,6 +990,7 @@ impl TryFrom<&JsonOptions> for protobuf::JsonOptions { compression: compression.into(), schema_infer_max_rec: opts.schema_infer_max_rec.map(|h| h as u64), compression_level: opts.compression_level, + newline_delimited: opts.newline_delimited, }) } } diff --git a/datafusion/proto/src/generated/datafusion_proto_common.rs b/datafusion/proto/src/generated/datafusion_proto_common.rs index 16601dcf46977..a1d4f5ce25425 100644 --- a/datafusion/proto/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto/src/generated/datafusion_proto_common.rs @@ -665,6 +665,9 @@ pub struct JsonOptions { /// Optional compression level #[prost(uint32, optional, tag = "3")] pub compression_level: ::core::option::Option, + /// Whether to read as newline-delimited JSON (default true). When false, expects JSON array format \[{},...\] + #[prost(bool, tag = "4")] + pub newline_delimited: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct TableParquetOptions { diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index 436a06493766d..adad64162f3c1 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -241,6 +241,7 @@ impl JsonOptionsProto { compression: options.compression as i32, schema_infer_max_rec: options.schema_infer_max_rec.map(|v| v as u64), compression_level: options.compression_level, + newline_delimited: options.newline_delimited, } } else { JsonOptionsProto::default() @@ -260,6 +261,7 @@ impl From<&JsonOptionsProto> for JsonOptions { }, schema_infer_max_rec: proto.schema_infer_max_rec.map(|v| v as usize), compression_level: proto.compression_level, + newline_delimited: proto.newline_delimited, } } } diff --git a/datafusion/sqllogictest/test_files/json.slt b/datafusion/sqllogictest/test_files/json.slt index b46b8c49d6623..60bec4213db02 100644 --- a/datafusion/sqllogictest/test_files/json.slt +++ b/datafusion/sqllogictest/test_files/json.slt @@ -146,3 +146,31 @@ EXPLAIN SELECT id FROM json_partitioned_test WHERE part = 2 ---- logical_plan TableScan: json_partitioned_test projection=[id], full_filters=[json_partitioned_test.part = Int32(2)] physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/partitioned_table_json/part=2/data.json]]}, projection=[id], file_type=json + +########## +## JSON Array Format Tests +########## + +# Test reading JSON array format file with newline_delimited=false +statement ok +CREATE EXTERNAL TABLE json_array_test +STORED AS JSON +OPTIONS ('format.newline_delimited' 'false') +LOCATION '../core/tests/data/json_array.json'; + +query IT rowsort +SELECT a, b FROM json_array_test +---- +1 hello +2 world +3 test + +statement ok +DROP TABLE json_array_test; + +# Test that reading JSON array format WITHOUT newline_delimited option fails +# (default is newline_delimited=true which can't parse array format correctly) +statement error Not valid JSON +CREATE EXTERNAL TABLE json_array_as_ndjson +STORED AS JSON +LOCATION '../core/tests/data/json_array.json';