19924: Support JSON arrays reader/parse for datafusion#206
19924: Support JSON arrays reader/parse for datafusion#206martin-augment wants to merge 4 commits intomainfrom
Conversation
Summary of ChangesHello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances DataFusion's capabilities for ingesting JSON data by introducing support for JSON array formatted files. Previously, DataFusion primarily handled newline-delimited JSON. With this change, users can now process JSON files where the entire content is a single array of JSON objects, providing greater flexibility in integrating with various data sources. The implementation includes a new configuration option to specify the expected JSON format and updates to the underlying file format and source readers to correctly parse and infer schemas from both formats. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
WalkthroughThis pull request adds support for JSON array format alongside the existing newline-delimited JSON (NDJSON) format in DataFusion. A new ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PR Review: JSON Array Format SupportSummaryThis PR adds support for reading JSON array format in addition to the existing newline-delimited JSON (NDJSON) format. The implementation is well-structured with comprehensive tests. ✅ Strengths
🔍 Critical IssuesIssue 1: Unbounded Memory Allocation
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for reading JSON array formatted files in DataFusion, alongside the existing newline-delimited JSON (NDJSON) format. The changes include adding a newline_delimited option to JsonOptions and NdJsonReadOptions, updating the file format and source implementations to handle both formats, and providing comprehensive tests for the new functionality. The protobuf definitions and serialization/deserialization logic have also been updated accordingly. Overall, the changes are well-structured and add valuable functionality.
| fn extract_json_records(content: &str) -> Result<Vec<String>> { | ||
| let content = content.trim(); | ||
| if !content.starts_with('[') || !content.ends_with(']') { | ||
| return Err(DataFusionError::Execution( | ||
| "JSON array format must start with '[' and end with ']'".to_string(), | ||
| )); | ||
| } | ||
|
|
||
| // Remove outer brackets | ||
| let inner = &content[1..content.len() - 1]; | ||
| let mut records = Vec::new(); | ||
| let mut depth = 0; | ||
| let mut in_string = false; | ||
| let mut escape_next = false; | ||
| let mut record_start: Option<usize> = None; | ||
|
|
||
| for (i, ch) in inner.char_indices() { | ||
| if escape_next { | ||
| escape_next = false; | ||
| continue; | ||
| } | ||
|
|
||
| match ch { | ||
| '\\' if in_string => escape_next = true, | ||
| '"' => in_string = !in_string, | ||
| '{' if !in_string => { | ||
| if depth == 0 { | ||
| record_start = Some(i); | ||
| } | ||
| depth += 1; | ||
| } | ||
| '}' if !in_string => { | ||
| depth -= 1; | ||
| if depth == 0 | ||
| && let Some(start) = record_start | ||
| { | ||
| records.push(inner[start..=i].to_string()); | ||
| record_start = None; | ||
| } | ||
| } | ||
| _ => {} | ||
| } | ||
| } | ||
|
|
||
| Ok(records) |
There was a problem hiding this comment.
The extract_json_records function is implemented to avoid full serde_json parsing for performance during schema inference. However, the read_json_array_to_batches function (in src/source.rs), which is responsible for actual data reading, still uses serde_json::from_str to parse the entire file content. This seems contradictory to the stated performance goal for large files and could lead to performance bottlenecks and high memory usage during data processing for large JSON array files. Consider aligning the parsing strategy for data reading with the more efficient approach used for schema inference, or explore streaming JSON parsing options for large files.
There was a problem hiding this comment.
value:useful; category:security; feedback:The Gemini AI reviewer is correct! The proposed changes load and parse the whole JSON file when it is an array (in contrast to NDJSON). This may consume a lot of memory if the json file is big. Also after parsing it it serializes the JSON objects to NDJSON format, and then this NDJSON is parsed again. It would be much better to parse the JSON array in a streaming fashion. This way it won't consume much memory and it will be done just once.
| fn read_json_array_to_batches<R: Read>( | ||
| mut reader: R, | ||
| schema: SchemaRef, | ||
| batch_size: usize, | ||
| ) -> Result<Vec<RecordBatch>> { | ||
| let mut content = String::new(); | ||
| reader.read_to_string(&mut content)?; | ||
|
|
||
| // Parse JSON array | ||
| let values: Vec<serde_json::Value> = serde_json::from_str(&content) | ||
| .map_err(|e| DataFusionError::External(Box::new(e)))?; | ||
|
|
||
| if values.is_empty() { | ||
| return Ok(vec![RecordBatch::new_empty(schema)]); | ||
| } | ||
|
|
||
| // Convert to NDJSON string for arrow-json reader | ||
| let ndjson: String = values | ||
| .iter() | ||
| .map(|v| v.to_string()) | ||
| .collect::<Vec<_>>() | ||
| .join("\n"); | ||
|
|
||
| let cursor = std::io::Cursor::new(ndjson); | ||
| let reader = ReaderBuilder::new(schema) | ||
| .with_batch_size(batch_size) | ||
| .build(cursor)?; | ||
|
|
||
| reader.collect::<Result<Vec<_>, _>>().map_err(Into::into) | ||
| } |
There was a problem hiding this comment.
The read_json_array_to_batches function reads the entire file content into a String and then parses it using serde_json::from_str. While the documentation for JsonOptions correctly highlights that JSON array format requires loading the entire file into memory, this approach can lead to significant memory consumption and potential out-of-memory errors for very large JSON array files. If possible, explore alternative strategies for processing large JSON arrays in a streaming fashion to reduce peak memory usage, perhaps by iterating over serde_json::Value directly from a BufReader without loading the entire content into a single String first.
There was a problem hiding this comment.
value:useful; category:security; feedback:The Gemini AI reviewer is correct! The proposed changes load and parse the whole JSON file when it is an array (in contrast to NDJSON). This may consume a lot of memory if the json file is big. Also after parsing it it serializes the JSON objects to NDJSON format, and then this NDJSON is parsed again. It would be much better to parse the JSON array in a streaming fashion. This way it won't consume much memory and it will be done just once.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
| projected, | ||
| FileCompressionType::UNCOMPRESSED, | ||
| Arc::new(object_store), | ||
| false, |
There was a problem hiding this comment.
Example passes wrong format flag for NDJSON data
High Severity
The json_opener example creates NDJSON data (newline-delimited JSON objects), but passes false for the newline_delimited parameter to JsonOpener::new. When newline_delimited is false, the reader expects JSON array format ([{...}, {...}]), not NDJSON. This mismatch causes the reader to fail because serde_json::from_str::<Vec<Value>> cannot parse NDJSON as a JSON array. The parameter should be true to match the actual data format.
There was a problem hiding this comment.
value:useful; category:bug; feedback:The Bugbot AI reviewer is correct! The example uses NDJSON formatted data, so it should tell the file opener to use the newline_delimited format. Prevents breaking the example by trying to read NDJSON file as a JSON array.
| compression: compression__.unwrap_or_default(), | ||
| schema_infer_max_rec: schema_infer_max_rec__, | ||
| compression_level: compression_level__, | ||
| newline_delimited: newline_delimited__.unwrap_or_default(), |
There was a problem hiding this comment.
Proto deserialization defaults to wrong value for newline_delimited
Medium Severity
The newline_delimited field uses proto3 default (false) when deserializing, but the application default is true. The proto comment explicitly states "default true". When old serialized data (without this field) is deserialized, it will incorrectly default to JSON array mode (false) instead of the legacy NDJSON mode (true), breaking backward compatibility. This affects pbjson.rs using unwrap_or_default(), and direct proto field access in from_proto/mod.rs and file_formats.rs.
Additional Locations (2)
There was a problem hiding this comment.
value:useful; category:bug; feedback:The Bugbot AI reviewer is correct! When deserializing an old .protobuf file that does not have data for the new newline_delimited field it will initialize it with the bool data type default value which is false, and false maps to json array, not to ndjson. Prevents a runtime error trying to read a NDJSON file as a JSON array because the config property is initialized to the wrong value.
| // JSON array format: read content and extract records | ||
| let mut content = String::new(); | ||
| reader.read_to_string(&mut content)?; | ||
| infer_schema_from_json_array_content(&content, records_to_read)? |
There was a problem hiding this comment.
Schema inference ignores max records limit across multiple JSON array files
Medium Severity
For JSON array format, records_to_read is passed by value to infer_schema_from_json_array_content but never decremented afterward. In contrast, the NDJSON path uses a take_while closure that mutates records_to_read. This means when inferring schema from multiple JSON array files, the max records limit applies per-file rather than total. The break condition if records_to_read == 0 will never trigger, causing schema inference to potentially process far more records than the configured schema_infer_max_rec limit.
Additional Locations (1)
There was a problem hiding this comment.
value:useful; category:bug; feedback:The Bugbot AI reviewer is correct! The new logic for inferring the Arrow schema from JSON array file does not use properly records_to_read. It is not decremented when reading a file, so it the number of JSON objects is less than records_to_read then the next file will be used and again the same initial value of records_to_read will be used. This way it may read all JSON array files and still not able to decide what schema to infer.
🤖 Augment PR SummarySummary: This PR adds support for reading JSON files stored as a single JSON array in addition to the existing newline-delimited JSON (NDJSON) format. Changes:
Technical Notes: JSON-array mode requires loading the entire file into memory and disables range-based scanning. 🤖 Was this summary useful? React with 👍 or 👎 |
| projected, | ||
| FileCompressionType::UNCOMPRESSED, | ||
| Arc::new(object_store), | ||
| false, |
There was a problem hiding this comment.
value:useful; category:bug; feedback:The Augment AI reviewer is correct! The example uses NDJSON formatted data, so it should tell the file opener to use the newline_delimited format. Prevents breaking the example by trying to read NDJSON file as a JSON array.
| // JSON array format: read content and extract records | ||
| let mut content = String::new(); | ||
| reader.read_to_string(&mut content)?; | ||
| infer_schema_from_json_array_content(&content, records_to_read)? |
There was a problem hiding this comment.
In infer_schema, the JSON-array branch doesn’t decrement records_to_read (and doesn’t call take_while()), so schema_infer_max_rec won’t be respected across multiple files and the records_to_read == 0 early-break won’t trigger.
Other Locations
datafusion/datasource-json/src/file_format.rs:371
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
value:useful; category:bug; feedback:The Augment AI reviewer is correct! The new logic for inferring the Arrow schema from JSON array file does not use properly records_to_read. It is not decremented when reading a file, so it the number of JSON objects is less than records_to_read then the next file will be used and again the same initial value of records_to_read will be used. This way it may read all JSON array files and still not able to decide what schema to infer.
| depth += 1; | ||
| } | ||
| '}' if !in_string => { | ||
| depth -= 1; |
There was a problem hiding this comment.
There was a problem hiding this comment.
value:useful; category:bug; feedback:The Augment AI reviewer is correct! There is no final check that the same amount of braces have been opened and closed. Same for the number of double quotes which are used to detect whether it is in string or not. The finding prevents "the parsing" to allow using invalid JSON.
| CompressionTypeVariant compression = 1; // Compression type | ||
| optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference | ||
| optional uint32 compression_level = 3; // Optional compression level | ||
| bool newline_delimited = 4; // Whether to read as newline-delimited JSON (default true). When false, expects JSON array format [{},...] |
There was a problem hiding this comment.
In proto3, a new bool field defaults to false when absent on the wire, but the code/docs treat newline_delimited as defaulting to true; this likely breaks backwards-compatibility when decoding older serialized plans/options (missing field => now interpreted as JSON-array mode).
Other Locations
datafusion/proto-common/src/from_proto/mod.rs:1108datafusion/proto-common/src/to_proto/mod.rs:993datafusion/proto/src/logical_plan/file_formats.rs:244datafusion/proto/src/logical_plan/file_formats.rs:264
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
value:useful; category:bug; feedback:The Augment AI reviewer is correct! When deserializing an old .protobuf file that does not have data for the new newline_delimited field it will initialize it with the bool data type default value which is false, and false maps to json array, not to ndjson. Prevents a runtime error trying to read a NDJSON file as a JSON array because the config property is initialized to the wrong value.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (4)
datafusion/proto-common/proto/datafusion_common.proto (1)
468-473: Potential backward compatibility concern with proto3 default value.In proto3, unset
boolfields default tofalse. However, the comment states the intended default istrue(newline-delimited mode). When deserializing older protos that don't have this field set,newline_delimitedwill befalse, potentially changing behavior from NDJSON to JSON array format unexpectedly.Consider using a wrapper type or an optional field to distinguish between "explicitly set to false" and "not set (use default true)":
💡 Suggested approach
message JsonOptions { CompressionTypeVariant compression = 1; // Compression type optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference optional uint32 compression_level = 3; // Optional compression level - bool newline_delimited = 4; // Whether to read as newline-delimited JSON (default true). When false, expects JSON array format [{},...] + optional bool newline_delimited = 4; // Whether to read as newline-delimited JSON (default true). When false, expects JSON array format [{},...] }Then in the Rust deserialization code, use
proto_opts.newline_delimited.unwrap_or(true)to apply the correct default.datafusion/proto-common/src/from_proto/mod.rs (1)
1097-1111: Handle the proto3 default value to maintain backward compatibility.As noted in the proto definition review, this direct assignment of
proto_opts.newline_delimitedwill result infalsewhen the field is unset in older protos, changing the default behavior from NDJSON to JSON array format.If the proto field is changed to
optional bool, update this to apply the intended default:💡 Suggested fix
impl TryFrom<&protobuf::JsonOptions> for JsonOptions { type Error = DataFusionError; fn try_from( proto_opts: &protobuf::JsonOptions, ) -> datafusion_common::Result<Self, Self::Error> { let compression: protobuf::CompressionTypeVariant = proto_opts.compression(); Ok(JsonOptions { compression: compression.into(), compression_level: proto_opts.compression_level, schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), - newline_delimited: proto_opts.newline_delimited, + newline_delimited: proto_opts.newline_delimited.unwrap_or(true), }) } }datafusion-examples/examples/custom_data_source/csv_json_opener.rs (1)
123-129: Fix JSON opener mode to match NDJSON input.The example data is newline-delimited JSON, but the new flag is set to
false(array mode). This will make the example fail at runtime unless the data is changed to an array.🐛 Proposed fix (keep NDJSON input)
- false, + true,If the intent is to demonstrate JSON array mode, please update the
datastring to a JSON array and adjust the expected output accordingly.datafusion/datasource-json/src/file_format.rs (1)
321-372:schema_infer_max_recordsis ignored for JSON arrays across files.In array mode,
records_to_readis never decremented, so schema inference scans all files even when a small max is set. This can cause significant extra I/O and memory usage.🐛 Proposed fix (track consumed records)
-fn infer_schema_from_json_array_content( +fn infer_schema_from_json_array_content( content: &str, max_records: usize, -) -> Result<Schema> { +) -> Result<(Schema, usize)> { let records = extract_json_records(content)?; let records_to_infer: Vec<&str> = records .iter() .take(max_records) .map(|s| s.as_str()) .collect(); + let used = records_to_infer.len(); if records_to_infer.is_empty() { - return Ok(Schema::empty()); + return Ok((Schema::empty(), 0)); } @@ - Ok(schema) + Ok((schema, used)) }- infer_schema_from_json_array_content(&content, records_to_read)? + let (schema, used) = + infer_schema_from_json_array_content(&content, records_to_read)?; + records_to_read = records_to_read.saturating_sub(used); + schema @@ - infer_schema_from_json_array_content(&content, records_to_read)? + let (schema, used) = + infer_schema_from_json_array_content(&content, records_to_read)?; + records_to_read = records_to_read.saturating_sub(used); + schema
🤖 Fix all issues with AI agents
In `@datafusion/datasource-json/src/file_format.rs`:
- Around line 214-262: extract_json_records currently can return partial records
on unbalanced braces/strings; modify the function (extract_json_records) to
validate completeness after the loop by checking that depth == 0, in_string ==
false, escape_next == false, and record_start is None, and if any of these
indicate unbalanced/malformed JSON return an
Err(DataFusionError::Execution(...)) with a clear message like "Malformed JSON
array: unbalanced braces or unterminated string" so malformed inputs are
rejected instead of silently producing partial records.
🧹 Nitpick comments (1)
datafusion/datasource-json/src/source.rs (1)
317-346: Consider memory efficiency for large JSON arrays.The current implementation parses the JSON array into
serde_json::Valueobjects, then re-serializes to NDJSON format, resulting in ~3x memory usage (original string + parsed values + NDJSON string). This is pragmatic for reusing the Arrow NDJSON reader but may be a concern for large files.A future optimization could use
serde_json::StreamDeserializerto process array elements incrementally without holding the entire parsed representation in memory.
| /// Extract JSON records from array format using bracket tracking. | ||
| /// | ||
| /// This avoids full JSON parsing by only tracking brace depth to find | ||
| /// record boundaries. Much faster than serde_json::from_str() for large files. | ||
| fn extract_json_records(content: &str) -> Result<Vec<String>> { | ||
| let content = content.trim(); | ||
| if !content.starts_with('[') || !content.ends_with(']') { | ||
| return Err(DataFusionError::Execution( | ||
| "JSON array format must start with '[' and end with ']'".to_string(), | ||
| )); | ||
| } | ||
|
|
||
| // Remove outer brackets | ||
| let inner = &content[1..content.len() - 1]; | ||
| let mut records = Vec::new(); | ||
| let mut depth = 0; | ||
| let mut in_string = false; | ||
| let mut escape_next = false; | ||
| let mut record_start: Option<usize> = None; | ||
|
|
||
| for (i, ch) in inner.char_indices() { | ||
| if escape_next { | ||
| escape_next = false; | ||
| continue; | ||
| } | ||
|
|
||
| match ch { | ||
| '\\' if in_string => escape_next = true, | ||
| '"' => in_string = !in_string, | ||
| '{' if !in_string => { | ||
| if depth == 0 { | ||
| record_start = Some(i); | ||
| } | ||
| depth += 1; | ||
| } | ||
| '}' if !in_string => { | ||
| depth -= 1; | ||
| if depth == 0 | ||
| && let Some(start) = record_start | ||
| { | ||
| records.push(inner[start..=i].to_string()); | ||
| record_start = None; | ||
| } | ||
| } | ||
| _ => {} | ||
| } | ||
| } | ||
|
|
||
| Ok(records) |
There was a problem hiding this comment.
Detect malformed JSON arrays instead of silently ignoring.
If braces/strings are unbalanced, extract_json_records returns partial records without error. That makes schema inference succeed on bad input and shifts failures later.
💡 Suggested fix
@@
for (i, ch) in inner.char_indices() {
@@
match ch {
@@
}
}
+ if depth != 0 || in_string || record_start.is_some() {
+ return Err(DataFusionError::Execution(
+ "Malformed JSON array: unbalanced object braces or unterminated string"
+ .to_string(),
+ ));
+ }
+
Ok(records)
}🤖 Prompt for AI Agents
In `@datafusion/datasource-json/src/file_format.rs` around lines 214 - 262,
extract_json_records currently can return partial records on unbalanced
braces/strings; modify the function (extract_json_records) to validate
completeness after the loop by checking that depth == 0, in_string == false,
escape_next == false, and record_start is None, and if any of these indicate
unbalanced/malformed JSON return an Err(DataFusionError::Execution(...)) with a
clear message like "Malformed JSON array: unbalanced braces or unterminated
string" so malformed inputs are rejected instead of silently producing partial
records.
There was a problem hiding this comment.
value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct! There is no final check that the same amount of braces have been opened and closed. Same for the number of double quotes which are used to detect whether it is in string or not. The finding prevents "the parsing" to allow using invalid JSON.
value:useful; category:security; feedback:The Claude AI reviewer is correct! The proposed changes load and parse the whole JSON file when it is an array (in contrast to NDJSON). This may consume a lot of memory if the json file is big. Also after parsing it it serializes the JSON objects to NDJSON format, and then this NDJSON is parsed again. It would be much better to parse the JSON array in a streaming fashion. This way it won't consume much memory and it will be done just once. |
value:useful; category:security; feedback:The CodeRabbit AI reviewer is correct! The proposed changes load and parse the whole JSON file when it is an array (in contrast to NDJSON). This may consume a lot of memory if the json file is big. Also after parsing it it serializes the JSON objects to NDJSON format, and then this NDJSON is parsed again. It would be much better to parse the JSON array in a streaming fashion. This way it won't consume much memory and it will be done just once. |
value:annoying; category:bug; feedback: The Claude AI reviewer is not correct! |
value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct! The example uses NDJSON formatted data, so it should tell the file opener to use the newline_delimited format. Prevents breaking the example by trying to read NDJSON file as a JSON array. |
value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct! The new logic for inferring the Arrow schema from JSON array file does not use properly |
value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct! When deserializing an old .protobuf file that does not have data for the new |
value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct! When deserializing an old .protobuf file that does not have data for the new |
19924: To review by AI