-
Notifications
You must be signed in to change notification settings - Fork 0
19924: Support JSON arrays reader/parse for datafusion #211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Co-authored-by: Martin Grigorov <[email protected]>
Summary of ChangesHello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly expands DataFusion's capabilities for ingesting JSON data by introducing robust support for JSON array format files. By implementing a streaming conversion to newline-delimited JSON, the system can now efficiently process large JSON array datasets without excessive memory consumption. This enhancement provides greater flexibility for users working with diverse JSON data structures. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
WalkthroughThis pull request adds support for reading JSON array format alongside the existing newline-delimited JSON (NDJSON) support. A new ✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Pull Request Review: JSON Array Support (#211)SummaryThis PR adds support for reading JSON array format Code Quality and Architecture✅ Strengths
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request adds support for reading JSON array format to DataFusion, which is a valuable feature enhancement. The implementation is well-structured, introducing a streaming JsonArrayToNdjsonReader for memory-efficient processing of file-based sources. The changes are comprehensive, including configuration updates, core implementation changes, protobuf modifications, and a thorough test suite.
My review focuses on improving correctness and maintainability. I've pointed out a misleading documentation comment about memory usage, a correctness issue regarding the validation of malformed JSON, and a case where garbage data after a JSON array might be ignored. Additionally, I've provided suggestions to reduce code duplication. Overall, this is a solid contribution.
| /// - Unbalanced braces/brackets (depth != 0) | ||
| /// - Unterminated string | ||
| /// - Missing closing `]` | ||
| pub fn validate_complete(&self) -> std::io::Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validate_complete method is a great addition for ensuring the JSON array is well-formed. However, it's currently unused, which could lead to silently accepting malformed JSON files (e.g., truncated files or files with unbalanced brackets). This is a correctness issue.
This method should be called when the reader is fully consumed. A good way to implement this would be to add a flag to JsonArrayToNdjsonReader to control validation on EOF, since schema inference might not read the whole file.
For example:
- Add a
validate_on_eof: boolfield toJsonArrayToNdjsonReader. - Update
newtopub fn new(reader: R, validate_on_eof: bool) -> Self. - In
fill_internal_buffer, when EOF is detected (bytes_read == 0), callvalidate_completeif the flag is set.// in fill_internal_buffer let bytes_read = self.inner.read(&mut raw_buf)?; if bytes_read == 0 { if self.validate_on_eof { self.validate_complete()?; } break; // EOF }
- Update call sites:
JsonOpener::openwould callJsonArrayToNdjsonReader::new(..., true), whileinfer_schema_from_json_arraywould call it withfalse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback:The Gemini AI reviewer is correct! The validate_complete() method is not used and this may lead to accepting an invalid JSON as valid. Prevents runtime errors while parsing the JSON due to missing validation while pre-processing the JSON.
| /// Note: JSON array format requires loading the entire file into memory. | ||
| /// For large files, newline-delimited format is recommended. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment Note: JSON array format requires loading the entire file into memory. is a bit misleading. The implementation uses a streaming converter (JsonArrayToNdjsonReader) for file-based sources, which is memory-efficient. While it's true that for Stream payloads the entire stream is buffered into memory first, for file-based sources (a very common case), it is streaming.
Consider rephrasing to be more precise. For example, you could mention that streaming sources might be buffered. The documentation in datafusion/datasource-json/src/file_format.rs is more accurate on this point: Note: JSON array format is processed using streaming conversion, which is memory-efficient even for large files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback: The Gemini AI reviewer is correct! The new version of the proposed changes still loads the complete JSON array file in the memory for GetResultPayload::Stream and this may lead to out of memory errors for huge files. The file is loaded as a stream from the ObjectStore, so it should be processed as a stream. Prevents out of memory failures when loading a big JSON array file.
| let (schema, records_consumed) = match r.payload { | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| GetResultPayload::File(file, _) => { | ||
| let decoder = file_compression_type.convert_read(file)?; | ||
| let mut reader = BufReader::new(decoder); | ||
| let iter = ValueIter::new(&mut reader, None); | ||
| infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? | ||
| let reader = BufReader::new(decoder); | ||
|
|
||
| if newline_delimited { | ||
| // NDJSON: use ValueIter directly | ||
| let iter = ValueIter::new(reader, None); | ||
| let mut count = 0; | ||
| let schema = | ||
| infer_json_schema_from_iterator(iter.take_while(|_| { | ||
| let should_take = count < records_to_read; | ||
| if should_take { | ||
| count += 1; | ||
| } | ||
| should_take | ||
| }))?; | ||
| (schema, count) | ||
| } else { | ||
| // JSON array format: use streaming converter | ||
| infer_schema_from_json_array(reader, records_to_read)? | ||
| } | ||
| } | ||
| GetResultPayload::Stream(_) => { | ||
| let data = r.bytes().await?; | ||
| let decoder = file_compression_type.convert_read(data.reader())?; | ||
| let mut reader = BufReader::new(decoder); | ||
| let iter = ValueIter::new(&mut reader, None); | ||
| infer_json_schema_from_iterator(iter.take_while(|_| take_while()))? | ||
| let reader = BufReader::new(decoder); | ||
|
|
||
| if newline_delimited { | ||
| let iter = ValueIter::new(reader, None); | ||
| let mut count = 0; | ||
| let schema = | ||
| infer_json_schema_from_iterator(iter.take_while(|_| { | ||
| let should_take = count < records_to_read; | ||
| if should_take { | ||
| count += 1; | ||
| } | ||
| should_take | ||
| }))?; | ||
| (schema, count) | ||
| } else { | ||
| // JSON array format: use streaming converter | ||
| infer_schema_from_json_array(reader, records_to_read)? | ||
| } | ||
| } | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's significant code duplication between the GetResultPayload::File and GetResultPayload::Stream match arms for handling newline_delimited. You could extract the logic for choosing between NDJSON and JSON array schema inference into a helper function to reduce duplication and improve maintainability.
A new function infer_schema_from_ndjson could be created, similar to infer_schema_from_json_array.
fn infer_schema_from_ndjson<R: Read>(
reader: R,
max_records: usize,
) -> Result<(Schema, usize)> {
let iter = ValueIter::new(reader, None);
let mut count = 0;
let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
let should_take = count < max_records;
if should_take {
count += 1;
}
should_take
}))?;
Ok((schema, count))
}Then the duplicated block could be simplified to:
let reader = BufReader::new(decoder);
if newline_delimited {
infer_schema_from_ndjson(reader, records_to_read)?
} else {
infer_schema_from_json_array(reader, records_to_read)?
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:good-to-have; category:bug; feedback:The Gemini AI reviewer is correct! There is code duplication in the blocks for File and Stream inputs which will be good to be extracted to a helper function and reused. Prevents double maintenance and bugs fixed only at one of the places
| if newline_delimited { | ||
| // NDJSON: use BufReader directly | ||
| let reader = BufReader::new(bytes); | ||
| let arrow_reader = ReaderBuilder::new(schema) | ||
| .with_batch_size(batch_size) | ||
| .build(reader)?; | ||
|
|
||
| Ok(futures::stream::iter(arrow_reader) | ||
| .map(|r| r.map_err(Into::into)) | ||
| .boxed()) | ||
| } else { | ||
| // JSON array format: wrap with streaming converter | ||
| // JsonArrayToNdjsonReader implements BufRead | ||
| let ndjson_reader = JsonArrayToNdjsonReader::new(bytes); | ||
| let arrow_reader = ReaderBuilder::new(schema) | ||
| .with_batch_size(batch_size) | ||
| .build(ndjson_reader)?; | ||
|
|
||
| Ok(futures::stream::iter(arrow_reader) | ||
| .map(|r| r.map_err(Into::into)) | ||
| .boxed()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is some code duplication in the GetResultPayload::File match arm for handling newline_delimited. You can refactor this to make it more concise.
Since JsonArrayToNdjsonReader implements BufRead, you could refactor this section to avoid repeating the ReaderBuilder and stream creation logic.
let reader: Box<dyn std::io::BufRead + Send> = if newline_delimited {
Box::new(std::io::BufReader::new(bytes))
} else {
Box::new(JsonArrayToNdjsonReader::new(bytes))
};
let arrow_reader = ReaderBuilder::new(schema)
.with_batch_size(batch_size)
.build(reader)?;
Ok(futures::stream::iter(arrow_reader)
.map(|r| r.map_err(Into::into))
.boxed())There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:good-to-have; category:bug; feedback:The Gemini AI reviewer is correct! There is code duplication in the blocks for File and Stream inputs which will be good to be extracted to a helper function and reused. Prevents double maintenance and bugs fixed only in one of the places
| } | ||
| } | ||
| } | ||
| JsonArrayState::Done => None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation of JsonArrayToNdjsonReader stops processing when it encounters the closing ]. Any characters (including non-whitespace garbage) after the closing bracket will be ignored. This could lead to silently accepting invalid JSON files.
Consider modifying the state machine to handle this. For example, after reaching the Done state, any subsequent non-whitespace characters should result in an InvalidData error.
You could modify fill_internal_buffer to check for this:
// in fill_internal_buffer
for &byte in &raw_buf[..bytes_read] {
if self.state == JsonArrayState::Done {
if !byte.is_ascii_whitespace() {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Found garbage after JSON array.",
));
}
continue;
}
if let Some(transformed) = self.process_byte(byte)
// ...This will make the parser stricter and more correct.
🤖 Augment PR SummarySummary: Adds support for reading JSON data in either newline-delimited JSON (NDJSON) or JSON array form. Changes:
Tests: Adds unit tests and SQLLogicTests plus new JSON fixtures for array/empty-array cases. 🤖 Was this summary useful? React with 👍 or 👎 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| CompressionTypeVariant compression = 1; // Compression type | ||
| optional uint64 schema_infer_max_rec = 2; // Optional max records for schema inference | ||
| optional uint32 compression_level = 3; // Optional compression level | ||
| bool newline_delimited = 4; // Whether to read as newline-delimited JSON (default true). When false, expects JSON array format [{},...] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| } | ||
|
|
||
| let bytes_read = self.inner.read(&mut raw_buf)?; | ||
| if bytes_read == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback:The Augment AI reviewer is correct! The validate_complete() method is not used and this may lead to accepting an invalid JSON as valid. Prevents runtime errors while parsing the JSON due to missing validation while pre-processing the JSON.
| /// ] | ||
| /// ``` | ||
| /// | ||
| /// Note: JSON array format requires loading the entire file into memory. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback: The Augment AI reviewer is correct! The new version of the proposed changes still loads the complete JSON array file in the memory for GetResultPayload::Stream and this may lead to out of memory errors for huge files. The file is loaded as a stream from the ObjectStore, so it should be processed as a stream. Prevents out of memory failures when loading a big JSON array file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
| compression: compression.into(), | ||
| compression_level: proto_opts.compression_level, | ||
| schema_infer_max_rec: proto_opts.schema_infer_max_rec.map(|h| h as usize), | ||
| newline_delimited: proto_opts.newline_delimited, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Proto deserialization defaults break backward compatibility for JSON format
Medium Severity
The newline_delimited field in the proto JsonOptions uses bool (not optional bool), which defaults to false in proto3. When deserializing old proto messages that lack this field, the value becomes false (JSON array format) instead of the expected true (NDJSON). The Rust JsonOptions correctly defaults to true, and the pbjson deserializer handles this with unwrap_or(true), but the prost binary deserialization paths directly use proto_opts.newline_delimited without any default handling. Additionally, JsonOptionsProto::default() returns newline_delimited = false. This breaks backward compatibility and could cause NDJSON files to be incorrectly read as JSON array format.
Additional Locations (2)
| } | ||
| if self.newline_delimited { | ||
| struct_ser.serialize_field("newlineDelimited", &self.newline_delimited)?; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pbjson serialization/deserialization asymmetry loses false value
Medium Severity
The pbjson serialization for newline_delimited only includes the field when the value is true (line 4609: if self.newline_delimited { serialize... }), but deserialization defaults missing fields to true (line 4721: unwrap_or(true)). This causes a roundtrip bug: serializing JsonOptions with newline_delimited = false omits the field, and deserializing the result gives newline_delimited = true. The serialization condition is inverted from what's needed to preserve the non-default value.
Additional Locations (1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
datafusion/proto-common/src/from_proto/mod.rs (1)
1097-1109: Default missingnewline_delimitedtotruefor backward compatibility.The
newline_delimitedfield in the proto is currently a scalar bool, which defaults tofalsewhen absent. This breaks backward compatibility—older serializedJsonOptionswithout this field will incorrectly deserialize asfalseinstead of the intended default oftrue.Making the proto field optional and defaulting to
truein deserialization is the correct approach, already implemented in the JSON serialization path. The from_proto conversion should follow the same pattern.🔧 Proposed fix (after making proto field optional)
- newline_delimited: proto_opts.newline_delimited, + newline_delimited: proto_opts.newline_delimited.unwrap_or(true),datafusion/proto-common/proto/datafusion_common.proto (1)
468-473: Makenewline_delimitedfield optional to preserve backward compatibility with binary serialization.Proto3 scalar bools default to
falsewhen missing from binary-encoded messages, which breaks backward compatibility: older serialized JsonOptions (from before this field existed) will deserialize asnewline_delimited = falseinstead of the intended defaulttrue. Change the field tooptional boolto allow proper default handling in deserialization logic.🔧 Proposed proto change
- bool newline_delimited = 4; // Whether to read as newline-delimited JSON (default true). When false, expects JSON array format [{},...] + optional bool newline_delimited = 4; // Whether to read as newline-delimited JSON (default true). When false, expects JSON array format [{},...]Update the
from_protoconversion to treatNoneastrue:newline_delimited: proto_opts.newline_delimited.unwrap_or(true),
🤖 Fix all issues with AI agents
In `@datafusion/common/src/config.rs`:
- Around line 3068-3086: The doc comment for the config field newline_delimited
misleadingly states that JSON array format "requires loading the entire file
into memory"; update the comment to reflect actual behavior by softening this
claim and referencing JsonArrayToNdjsonReader: change the note to say JSON array
format "can be more memory‑intensive than NDJSON and may require buffering
depending on the reader implementation (e.g., JsonArrayToNdjsonReader streams
arrays to NDJSON to reduce memory use)" or similar, so readers understand that
streaming mitigates the memory requirement while NDJSON is generally more
memory‑efficient for very large files; edit the comment attached to pub
newline_delimited: bool, default = true accordingly.
In `@datafusion/datasource-json/src/source.rs`:
- Around line 298-314: The current stream path unconditionally collects the
entire stream via the try_fold into a Vec<u8>, risking OOM; modify the
accumulation logic used before calling file_compression_type.convert_read so
that it enforces a configurable maximum size (e.g., MAX_JSON_ARRAY_BYTES) and
returns a clear DataFusionError if exceeded, or alternatively, when the source
indicates a file-like stream, write the incoming chunks to a temporary file and
pass that file into file_compression_type.convert_read/JsonArrayToNdjsonReader
to avoid in-memory buffering; update the code surrounding the
s.map_err(...).try_fold(...) and the subsequent use of JsonArrayToNdjsonReader
and file_compression_type.convert_read to implement the cap-or-spill-to-disk
behavior and surface a clear error message when the limit is hit.
In `@datafusion/datasource-json/src/utils.rs`:
- Around line 238-262: In fill_internal_buffer, avoid dropping transformed bytes
by limiting each read to the remaining capacity (use remaining =
self.buffer.len() - write_pos and read at most min(DEFAULT_BUF_SIZE, remaining)
into raw_buf) instead of always reading DEFAULT_BUF_SIZE, and after a read that
returns 0 (EOF) validate the JSON array state (if self.state !=
JsonArrayState::Done return an error/Err indicating unexpected EOF) so malformed
arrays fail fast; update the loop around self.inner.read, the for byte in
&raw_buf[..bytes_read] processing, and any early breaks in process_byte to
respect write_pos and the JsonArrayState::Done check.
🧹 Nitpick comments (2)
datafusion/datasource-json/src/mod.rs (1)
26-29: Consider whetherJsonArrayToNdjsonReadershould be part of the public API.The re-export at line 29 makes
JsonArrayToNdjsonReaderpart of the crate's public API surface. This is appropriate if external users may need to use this utility for custom JSON array processing. However, if it's primarily an internal implementation detail for the JSON format handling, consider keeping it internal (removing thepub usere-export while keepingpub mod utilsfor internal crate access).If intentionally public, this is a clean API addition that follows the existing re-export pattern in this file.
datafusion-examples/examples/custom_data_source/csv_json_opener.rs (1)
123-129: Prefer a named flag for readability in the example.The trailing
trueis a magic boolean; consider binding it to a named variable or adding a short comment.💡 Suggested tweak
- let opener = JsonOpener::new( - 8192, - projected, - FileCompressionType::UNCOMPRESSED, - Arc::new(object_store), - true, - ); + let newline_delimited = true; + let opener = JsonOpener::new( + 8192, + projected, + FileCompressionType::UNCOMPRESSED, + Arc::new(object_store), + newline_delimited, + );
| /// The JSON format to use when reading files. | ||
| /// | ||
| /// When `true` (default), expects newline-delimited JSON (NDJSON): | ||
| /// ```text | ||
| /// {"key1": 1, "key2": "val"} | ||
| /// {"key1": 2, "key2": "vals"} | ||
| /// ``` | ||
| /// | ||
| /// When `false`, expects JSON array format: | ||
| /// ```text | ||
| /// [ | ||
| /// {"key1": 1, "key2": "val"}, | ||
| /// {"key1": 2, "key2": "vals"} | ||
| /// ] | ||
| /// ``` | ||
| /// | ||
| /// Note: JSON array format requires loading the entire file into memory. | ||
| /// For large files, newline-delimited format is recommended. | ||
| pub newline_delimited: bool, default = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarify the JSON array memory note.
The comment says JSON array format requires loading the entire file into memory. If the new JsonArrayToNdjsonReader streams, this wording is misleading. Consider softening to “may be more memory‑intensive than NDJSON” or align it with the actual behavior.
✏️ Suggested doc tweak
- /// Note: JSON array format requires loading the entire file into memory.
- /// For large files, newline-delimited format is recommended.
+ /// Note: JSON array parsing can be more memory-intensive than NDJSON.
+ /// For large files, NDJSON is recommended.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// The JSON format to use when reading files. | |
| /// | |
| /// When `true` (default), expects newline-delimited JSON (NDJSON): | |
| /// ```text | |
| /// {"key1": 1, "key2": "val"} | |
| /// {"key1": 2, "key2": "vals"} | |
| /// ``` | |
| /// | |
| /// When `false`, expects JSON array format: | |
| /// ```text | |
| /// [ | |
| /// {"key1": 1, "key2": "val"}, | |
| /// {"key1": 2, "key2": "vals"} | |
| /// ] | |
| /// ``` | |
| /// | |
| /// Note: JSON array format requires loading the entire file into memory. | |
| /// For large files, newline-delimited format is recommended. | |
| pub newline_delimited: bool, default = true | |
| /// The JSON format to use when reading files. | |
| /// | |
| /// When `true` (default), expects newline-delimited JSON (NDJSON): | |
| /// |
🤖 Prompt for AI Agents
In `@datafusion/common/src/config.rs` around lines 3068 - 3086, The doc comment
for the config field newline_delimited misleadingly states that JSON array
format "requires loading the entire file into memory"; update the comment to
reflect actual behavior by softening this claim and referencing
JsonArrayToNdjsonReader: change the note to say JSON array format "can be more
memory‑intensive than NDJSON and may require buffering depending on the reader
implementation (e.g., JsonArrayToNdjsonReader streams arrays to NDJSON to reduce
memory use)" or similar, so readers understand that streaming mitigates the
memory requirement while NDJSON is generally more memory‑efficient for very
large files; edit the comment attached to pub newline_delimited: bool, default =
true accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! The new version of the proposed changes still loads the complete JSON array file in the memory and this may lead to out of memory errors for huge files. The file is loaded as a stream from the ObjectStore, so it should be processed as a stream. Prevents out of memory failures when loading a big JSON array file.
| // JSON array format from stream: collect bytes first, then use streaming converter | ||
| // Note: We still need to collect for streams, but the converter avoids | ||
| // additional memory overhead from serde_json parsing | ||
| let bytes = s | ||
| .map_err(DataFusionError::from) | ||
| .try_fold(Vec::new(), |mut acc, chunk| async move { | ||
| acc.extend_from_slice(&chunk); | ||
| Ok(acc) | ||
| }) | ||
| .await?; | ||
|
|
||
| let decompressed = file_compression_type | ||
| .convert_read(std::io::Cursor::new(bytes))?; | ||
|
|
||
| // Use streaming converter - it implements BufRead | ||
| let ndjson_reader = JsonArrayToNdjsonReader::new(decompressed); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's check the project structure and find the Cargo.toml for the datasource-json crate
find . -name "Cargo.toml" -path "*/datasource-json/*" | head -5Repository: martin-augment/datafusion
Length of output: 108
🏁 Script executed:
# Check the current file and its context
wc -l datafusion/datasource-json/src/source.rsRepository: martin-augment/datafusion
Length of output: 113
🏁 Script executed:
# List the dependencies in the datasource-json crate
cat datafusion/datasource-json/Cargo.tomlRepository: martin-augment/datafusion
Length of output: 2049
🏁 Script executed:
# Find the JsonArrayToNdjsonReader implementation
rg "JsonArrayToNdjsonReader" -A 20 datafusion/datasource-json/src/Repository: martin-augment/datafusion
Length of output: 23331
🏁 Script executed:
# Check what traits JsonArrayToNdjsonReader requires
ast-grep --pattern 'struct JsonArrayToNdjsonReader { $$$ }'Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Check the context around lines 298-314 to see what type `s` is and full context
sed -n '280,330p' datafusion/datasource-json/src/source.rsRepository: martin-augment/datafusion
Length of output: 2372
🏁 Script executed:
# Check if tokio-util is available as a dependency anywhere in the workspace
rg "tokio-util" --type tomlRepository: martin-augment/datafusion
Length of output: 591
🏁 Script executed:
# Look for how other parts of the code handle stream-to-read conversions
rg "StreamReader\|SyncIoBridge" --type rsRepository: martin-augment/datafusion
Length of output: 95
🏁 Script executed:
# Check the file_compression_type.convert_read signature to understand what it expects
rg "convert_read" -A 5 --type rs | head -40Repository: martin-augment/datafusion
Length of output: 95
🏁 Script executed:
# Fix rg command - use correct language identifier for Rust
rg "convert_read" -A 5 | head -50Repository: martin-augment/datafusion
Length of output: 4021
🏁 Script executed:
# Find the type of the stream `s` and understand the signature better
sed -n '280,290p' datafusion/datasource-json/src/source.rs | cat -nRepository: martin-augment/datafusion
Length of output: 662
🏁 Script executed:
# Look for convert_read and convert_stream implementations
rg "fn convert_read|fn convert_stream" -B 2 -A 8 | head -80Repository: martin-augment/datafusion
Length of output: 2116
🏁 Script executed:
# Check if SyncIoBridge is a real thing in tokio-util 0.7.17
web_search
SyncIoBridge tokio-util 0.7.17Repository: martin-augment/datafusion
Length of output: 170
🏁 Script executed:
# Look for how file_compression_type is used elsewhere in the codebase
rg "file_compression_type\.convert" -B 2 -A 2 | head -50Repository: martin-augment/datafusion
Length of output: 4163
🌐 Web query:
tokio-util 0.7.17 SyncIoBridge StreamReader
💡 Result:
In tokio-util 0.7.x (incl. 0.7.17), these two adapters live in tokio_util::io:
StreamReader: converts aStream<Item = Result<B, E>>(whereB: bytes::Buf,E: Into<std::io::Error>) into anAsyncRead(alsoAsyncBufRead). You build it withStreamReader::new(stream). [1]SyncIoBridge: converts anAsyncRead/AsyncWriteinto synchronousstd::io::Read/std::io::Writeby blocking on a Tokio runtime handle. It must be used from a blocking thread (e.g. insidetokio::task::spawn_blocking); calling it from an async context can panic. [2]
To use them together (Stream → AsyncRead → std::io::Read):
use tokio::task::spawn_blocking;
use tokio_util::io::{StreamReader, SyncIoBridge};
use tokio_stream::StreamExt; // if you need map()
use bytes::Bytes;
use std::io::Read;
async fn consume_stream_sync<S, E>(stream: S) -> std::io::Result<Vec<u8>>
where
S: futures_core::Stream<Item = Result<Bytes, E>> + Send + 'static,
E: Into<std::io::Error> + Send + 'static,
{
let reader = StreamReader::new(stream); // Stream -> AsyncRead [1]
let mut sync_reader = SyncIoBridge::new(reader); // AsyncRead -> Read [2]
spawn_blocking(move || {
let mut out = Vec::new();
sync_reader.read_to_end(&mut out)?;
Ok(out)
})
.await?
}Cargo features (0.7.17): enable io-util (it pulls in io) to get SyncIoBridge; StreamReader is under io. [3]
Sources: [1] [2] [3]
🌐 Web query:
tokio-util 0.7 io module available types
💡 Result:
In tokio-util 0.7, the tokio_util::io module (enabled with the io feature) includes these items: [1]
-
Submodule
simplex(unidirectional byte-oriented channel) [1]
-
Structs
CopyToBytesInspectReaderInspectWriterReaderStreamSinkWriterStreamReaderSyncIoBridge(requiresio-utilfeature) [1]
-
Functions
poll_read_bufpoll_write_bufread_bufread_exact_arc(requiresio-utilfeature) [1]
Sources: [1]
🏁 Script executed:
# Check what actual dependencies datasource-json has, versus datasource
cat datafusion/datasource-json/Cargo.toml | grep -A 50 "\[dependencies\]"Repository: martin-augment/datafusion
Length of output: 920
🏁 Script executed:
# Look at the type signature of the stream `s` more carefully - what's returned by GetResultPayload::Stream
rg "GetResultPayload" -B 5 -A 5 | head -60Repository: martin-augment/datafusion
Length of output: 4265
🏁 Script executed:
# Check if there are any async-to-sync adapters used elsewhere in the codebase
rg "StreamReader\|to_sync\|sync.*adapter" | head -20Repository: martin-augment/datafusion
Length of output: 51
🏁 Script executed:
# Verify the async context and return type more carefully
sed -n '240,260p' datafusion/datasource-json/src/source.rsRepository: martin-augment/datafusion
Length of output: 938
🏁 Script executed:
# Check if there are any examples of spawn_blocking used in similar contexts
rg "spawn_blocking" -B 3 -A 3 | head -40Repository: martin-augment/datafusion
Length of output: 3877
Avoid unbounded memory buffering for JSON array streams.
The stream path collects the entire file into Vec<u8>, which can grow without bounds and cause OOM for large arrays. This undermines the streaming design of JsonArrayToNdjsonReader.
Instead: add an explicit size cap and return a clear error if exceeded, or prefer the file-based code path when the stream represents a file. If neither is practical, document the limitation and memory trade-off clearly.
🤖 Prompt for AI Agents
In `@datafusion/datasource-json/src/source.rs` around lines 298 - 314, The current
stream path unconditionally collects the entire stream via the try_fold into a
Vec<u8>, risking OOM; modify the accumulation logic used before calling
file_compression_type.convert_read so that it enforces a configurable maximum
size (e.g., MAX_JSON_ARRAY_BYTES) and returns a clear DataFusionError if
exceeded, or alternatively, when the source indicates a file-like stream, write
the incoming chunks to a temporary file and pass that file into
file_compression_type.convert_read/JsonArrayToNdjsonReader to avoid in-memory
buffering; update the code surrounding the s.map_err(...).try_fold(...) and the
subsequent use of JsonArrayToNdjsonReader and file_compression_type.convert_read
to implement the cap-or-spill-to-disk behavior and surface a clear error message
when the limit is hit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! The new version of the proposed changes still loads the complete JSON array file in the memory and this may lead to out of memory errors for huge files. The file is loaded as a stream from the ObjectStore, so it should be processed as a stream. Prevents out of memory failures when loading a big JSON array file.
| // Read raw data from inner reader | ||
| let mut raw_buf = vec![0u8; DEFAULT_BUF_SIZE]; | ||
| let mut write_pos = 0; | ||
|
|
||
| loop { | ||
| if write_pos >= self.buffer.len() { | ||
| break; | ||
| } | ||
|
|
||
| let bytes_read = self.inner.read(&mut raw_buf)?; | ||
| if bytes_read == 0 { | ||
| break; // EOF | ||
| } | ||
|
|
||
| for &byte in &raw_buf[..bytes_read] { | ||
| if self.state == JsonArrayState::Done { | ||
| break; | ||
| } | ||
| if let Some(transformed) = self.process_byte(byte) | ||
| && write_pos < self.buffer.len() | ||
| { | ||
| self.buffer[write_pos] = transformed; | ||
| write_pos += 1; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent dropped bytes and enforce EOF validation in fill_internal_buffer.
Reading a full raw chunk while the output buffer has limited remaining space can drop transformed bytes, and EOF before a closing ] currently returns Ok(0) (silently accepting malformed or misformatted input). Limit reads to the remaining capacity and validate on EOF so invalid arrays fail fast.
🐛 Proposed fix
- let bytes_read = self.inner.read(&mut raw_buf)?;
- if bytes_read == 0 {
- break; // EOF
- }
+ let remaining = self.buffer.len() - write_pos;
+ let bytes_read = self.inner.read(&mut raw_buf[..remaining])?;
+ if bytes_read == 0 {
+ // Only validate when we have no buffered output left to emit
+ if write_pos == 0 {
+ self.validate_complete()?;
+ }
+ break; // EOF
+ }🤖 Prompt for AI Agents
In `@datafusion/datasource-json/src/utils.rs` around lines 238 - 262, In
fill_internal_buffer, avoid dropping transformed bytes by limiting each read to
the remaining capacity (use remaining = self.buffer.len() - write_pos and read
at most min(DEFAULT_BUF_SIZE, remaining) into raw_buf) instead of always reading
DEFAULT_BUF_SIZE, and after a read that returns 0 (EOF) validate the JSON array
state (if self.state != JsonArrayState::Done return an error/Err indicating
unexpected EOF) so malformed arrays fail fast; update the loop around
self.inner.read, the for byte in &raw_buf[..bytes_read] processing, and any
early breaks in process_byte to respect write_pos and the JsonArrayState::Done
check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value:useful; category:bug; feedback:The CodeRabbit AI reviewer is correct! The validate_complete() method is not used and this may lead to accepting an invalid JSON as valid. Prevents runtime errors while parsing the JSON due to missing validation while pre-processing the JSON.
value:useful; category:bug; feedback:The Claude AI reviewer is correct! The validate_complete() method is not used and this may lead to accepting an invalid JSON as valid. Prevents runtime errors while parsing the JSON due to missing validation while pre-processing the JSON. |
19924: To review by AI