19924: Support JSON arrays reader/parse for datafusion#205
19924: Support JSON arrays reader/parse for datafusion#205martin-augment wants to merge 1 commit intomainfrom
Conversation
Summary of ChangesHello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly extends DataFusion's capabilities for reading JSON data by introducing support for JSON files structured as a single array of objects. Previously, DataFusion primarily handled newline-delimited JSON. With the new Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
WalkthroughThis pull request adds support for JSON array format handling alongside the existing line-delimited NDJSON format in DataFusion. A new ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
PR Review: Support JSON Arrays Reader/Parse for DataFusionOverviewThis PR adds support for reading JSON files in array format in addition to the existing line-delimited (NDJSON) format. Overall Assessment: Good implementation with comprehensive testing. A few concerns need to be addressed before merging. Code Quality and Best PracticesStrengths:
Issues:
Potential Bugs and Issues
Performance Considerations
Security Concerns
Test CoverageStrengths:
Suggestions:
RecommendationMust Fix:
Should Fix: Nice to Have: Great work overall! Main concerns are memory safety and performance for large files. |
🤖 Augment PR SummarySummary: This PR adds first-class support for reading JSON files whose top-level value is an array (e.g. Changes:
Technical Notes: Array mode disables range-based scanning and currently requires full-file reads to parse/convert the array into record batches. 🤖 Was this summary useful? React with 👍 or 👎 |
| infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? | ||
|
|
||
| if is_array_format { | ||
| infer_json_schema_from_json_array(&mut reader, records_to_read)? |
There was a problem hiding this comment.
In infer_schema, records_to_read is only decremented via the NDJSON take_while() path, so in JSON-array mode it never decreases and the schema_infer_max_rec limit / early break won’t apply across multiple files. This can cause schema inference to scan more records/files than requested and potentially infer additional fields unexpectedly.
🤖 Was this useful? React with 👍 or 👎
| pub file_sort_order: Vec<Vec<SortExpr>>, | ||
| /// Whether the JSON file is in array format `[{...}, {...}]` instead of | ||
| /// line-delimited format. Defaults to `false`. | ||
| pub format_array: bool, |
There was a problem hiding this comment.
Adding pub format_array to the public NdJsonReadOptions struct is a semver-breaking change for downstream users that construct it with a struct literal. Consider whether NdJsonReadOptions should be #[non_exhaustive] (or otherwise avoid adding new public fields) to preserve forward compatibility.
🤖 Was this useful? React with 👍 or 👎
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
datafusion/datasource-json/src/file_format.rs (1)
200-304: Schema inference limit isn’t enforced across files for array format.
In the array branch,records_to_readnever decreases, soschema_infer_max_recis applied per file instead of globally. This diverges from the line-delimited behavior and can over-read schemas when multiple files are provided.🔧 Proposed fix (track how many array elements were consumed)
-fn infer_json_schema_from_json_array<R: Read>( - reader: &mut R, - max_records: usize, -) -> std::result::Result<Schema, ArrowError> { +fn infer_json_schema_from_json_array<R: Read>( + reader: &mut R, + max_records: usize, +) -> std::result::Result<(Schema, usize), ArrowError> { let mut content = String::new(); reader.read_to_string(&mut content).map_err(|e| { ArrowError::JsonError(format!("Failed to read JSON content: {e}")) })?; // Parse as JSON array using serde_json let values: Vec<serde_json::Value> = serde_json::from_str(&content) .map_err(|e| ArrowError::JsonError(format!("Failed to parse JSON array: {e}")))?; // Take only max_records for schema inference - let values_to_infer: Vec<_> = values.into_iter().take(max_records).collect(); + let take = values.len().min(max_records); + let values_to_infer: Vec<_> = values.into_iter().take(take).collect(); - if values_to_infer.is_empty() { + if take == 0 { return Err(ArrowError::JsonError( "JSON array is empty, cannot infer schema".to_string(), )); } // Use arrow's schema inference on the parsed values - infer_json_schema_from_iterator(values_to_infer.into_iter().map(Ok)) + let schema = infer_json_schema_from_iterator(values_to_infer.into_iter().map(Ok))?; + Ok((schema, take)) } @@ - if is_array_format { - infer_json_schema_from_json_array(&mut reader, records_to_read)? - } else { + if is_array_format { + let (schema, used) = + infer_json_schema_from_json_array(&mut reader, records_to_read)?; + records_to_read = records_to_read.saturating_sub(used); + schema + } else { let iter = ValueIter::new(&mut reader, None); infer_json_schema_from_iterator( iter.take_while(|_| take_while()), )? } @@ - if is_array_format { - infer_json_schema_from_json_array(&mut reader, records_to_read)? - } else { + if is_array_format { + let (schema, used) = + infer_json_schema_from_json_array(&mut reader, records_to_read)?; + records_to_read = records_to_read.saturating_sub(used); + schema + } else { let iter = ValueIter::new(&mut reader, None); infer_json_schema_from_iterator( iter.take_while(|_| take_while()), )? }
🧹 Nitpick comments (5)
datafusion-examples/examples/custom_data_source/csv_json_opener.rs (1)
123-129: Consider adding a comment to clarify the boolean parameter.In example code, a bare
falsedoesn't convey its purpose to readers. Since this PR introduces theformat_arrayflag, adding a brief comment would help users understand the distinction between NDJSON and array-format JSON.📝 Suggested improvement
let opener = JsonOpener::new( 8192, projected, FileCompressionType::UNCOMPRESSED, Arc::new(object_store), - false, + false, // format_array: false for NDJSON (newline-delimited), true for JSON array format );datafusion/datasource-json/src/source.rs (4)
266-282: Consider optimizing memory usage for the streaming path.The streaming path accumulates all bytes into a
Vec<u8>, then the decompressed reader is passed toread_json_array_to_batches, which callsread_to_stringcreating another copy. This results in multiple in-memory copies of the data.For large files, consider passing the bytes directly to avoid the extra copy, or using a streaming JSON parser that can handle arrays incrementally.
308-308: Remove redundant import.
ReaderBuilderis already imported at line 40 viause arrow::json::ReaderBuilder;. This inner import is unnecessary.Proposed fix
fn read_json_array_to_batches<R: Read>( mut reader: R, schema: SchemaRef, batch_size: usize, ) -> Result<Vec<RecordBatch>> { - use arrow::json::ReaderBuilder; - let mut content = String::new();
310-333: Consider adding a clearer error message for non-array JSON input.If a user accidentally provides a JSON object (e.g.,
{"key": "value"}) instead of a JSON array, the current code produces a generic serde deserialization error. Consider wrapping with a more descriptive error:Proposed improvement
// Parse JSON array - let values: Vec<serde_json::Value> = serde_json::from_str(&content) - .map_err(|e| DataFusionError::External(Box::new(e)))?; + let values: Vec<serde_json::Value> = serde_json::from_str(&content).map_err(|e| { + DataFusionError::External(format!( + "Failed to parse JSON array (ensure input is a JSON array, not an object): {e}" + ).into()) + })?;
321-326: Memory optimization opportunity: avoid double serialization.The current implementation parses the entire JSON array into
Vec<serde_json::Value>, then serializes each value back to strings to create NDJSON. This involves:
contentString (file content)Vec<serde_json::Value>(parsed representation)ndjsonString (re-serialized)For large files, this can use ~3x the file size in peak memory.
A more efficient approach would be to write directly to a buffer while iterating, or consider if Arrow's RawDecoder can handle the values directly without the NDJSON round-trip.
Slightly improved version using a single buffer
- // Convert to NDJSON string for arrow-json reader - let ndjson: String = values - .iter() - .map(|v| v.to_string()) - .collect::<Vec<_>>() - .join("\n"); + // Convert to NDJSON format for arrow-json reader + let mut ndjson = String::new(); + for (i, v) in values.iter().enumerate() { + if i > 0 { + ndjson.push('\n'); + } + // Write directly to ndjson to avoid intermediate Vec<String> + use std::fmt::Write; + write!(&mut ndjson, "{}", v).expect("String write cannot fail"); + }
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
| infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? | ||
|
|
||
| if is_array_format { | ||
| infer_json_schema_from_json_array(&mut reader, records_to_read)? |
There was a problem hiding this comment.
Schema inference ignores record limit across files
Medium Severity
When inferring schema from multiple JSON array files with schema_infer_max_rec configured, the code doesn't properly track the remaining records to read across files. Unlike the NDJSON path which decrements records_to_read via the take_while closure, the array format path passes records_to_read to infer_json_schema_from_json_array but never updates it afterward. This causes all files to be processed instead of stopping after reading the specified maximum number of records.
There was a problem hiding this comment.
Code Review
This pull request introduces support for reading JSON files in array format, in addition to the existing newline-delimited JSON (NDJSON) format. The changes include adding a format_array option to JsonOptions and NdJsonReadOptions, updating the schema inference logic, and modifying the data reading path to handle JSON arrays. New test cases have been added to cover the new functionality, including empty arrays, schema inference limits, data reading, projection, and compression. The protobuf definitions and their serialization/deserialization logic have also been updated to reflect these changes.
| reader.read_to_string(&mut content).map_err(|e| { | ||
| ArrowError::JsonError(format!("Failed to read JSON content: {e}")) | ||
| })?; |
There was a problem hiding this comment.
The infer_json_schema_from_json_array function reads the entire file content into a String here. For very large JSON array files, this could lead to Out-Of-Memory (OOM) issues, especially during schema inference where only a subset of records might be needed. Consider streaming the data or using a more memory-efficient parsing approach if possible, particularly for schema inference where max_records is intended to limit the data scanned.
| // For streaming, we need to collect all bytes first | ||
| let bytes = s | ||
| .map_err(DataFusionError::from) | ||
| .try_fold(Vec::new(), |mut acc, chunk| async move { | ||
| acc.extend_from_slice(&chunk); | ||
| Ok(acc) | ||
| }) | ||
| .await?; |
There was a problem hiding this comment.
For streaming payloads, the entire stream is collected into a Vec<u8> before processing. This can lead to Out-Of-Memory (OOM) errors for large JSON array files, similar to the schema inference logic. While format_array is a new feature, handling large files efficiently is crucial for a data processing framework. Consider if there's a way to process the stream incrementally without loading the entire file into memory.
| let mut content = String::new(); | ||
| reader.read_to_string(&mut content)?; |
There was a problem hiding this comment.
The read_json_array_to_batches function reads the entire file content into a String and then parses it with serde_json::from_str. This approach, while functional, can be memory-intensive for large files. Additionally, converting the parsed serde_json::Values back into an NDJSON string and then using arrow::json::ReaderBuilder to build record batches introduces an unnecessary intermediate serialization/deserialization step. It would be more efficient to directly convert serde_json::Value into Arrow arrays or to use a parser that can directly produce Arrow arrays from a JSON array structure without intermediate string conversion.
|
Superseded by #206 |
19924: To review by AI