Skip to content

Conversation

@martin-augment
Copy link
Owner

19924: To review by AI

@coderabbitai
Copy link

coderabbitai bot commented Feb 3, 2026

Walkthrough

This pull request adds support for reading JSON data in both newline-delimited (NDJSON) and JSON array formats across the DataFusion ecosystem. The changes include: renaming NdJsonReadOptions to JsonReadOptions with a new newline_delimited boolean field; extending JsonFormat, JsonOpener, and JsonSource with format selection capabilities; implementing a JsonArrayToNdjsonReader utility for streaming JSON array-to-NDJSON conversion; updating protobuf definitions to persist the newline_delimited flag; adding test data and comprehensive tests for JSON array parsing; and updating all dependent code paths and examples to use the new API.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pr-19924-2026-02-03-07-44-16

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@gemini-code-assist
Copy link

Summary of Changes

Hello @martin-augment, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces comprehensive support for reading JSON files in array format within DataFusion, complementing the existing newline-delimited JSON capabilities. The core change involves a new streaming mechanism that efficiently converts JSON arrays to a newline-delimited format on the fly, enabling processing of large files without excessive memory usage. This is exposed through a unified JsonReadOptions and a new configuration setting, providing users with flexible control over JSON data ingestion. The changes ensure DataFusion can handle a broader range of JSON data sources while maintaining performance and memory efficiency.

Highlights

  • JSON Array Format Support: DataFusion now supports reading JSON files in array format (e.g., [{}, {}]) in addition to the existing newline-delimited JSON (NDJSON) format. This significantly expands the types of JSON data that can be directly queried.
  • Streaming Conversion for Memory Efficiency: A new JsonArrayToNdjsonReader utility has been introduced, which performs a streaming, on-the-fly conversion of JSON array files to NDJSON. This allows processing of very large JSON array files without loading the entire file into memory, ensuring memory efficiency.
  • Unified JSON Read Options: The NdJsonReadOptions struct has been deprecated and replaced by a more general JsonReadOptions. This new struct includes a newline_delimited flag (defaulting to true) to specify whether the input is NDJSON or JSON array format. This simplifies configuration and provides a single interface for all JSON reading.
  • New Configuration Option: A new configuration option, json.newline_delimited, has been added to ConfigOptions to control the expected JSON format. This can be set via CREATE EXTERNAL TABLE statements using OPTIONS ('format.newline_delimited' 'false').
  • Enhanced Schema Inference: The schema inference logic for JSON files has been updated to correctly infer schemas from both NDJSON and JSON array formats, leveraging the new streaming conversion for array-formatted files.
  • No Range-Based Scanning for JSON Arrays: It's important to note that JSON array format does not support range-based file scanning. Attempts to use repartitioning with JSON array files will now result in an error, guiding users to disable repartition_file_scans or use NDJSON.
  • Robust Testing and Utilities: Extensive unit and SQL logic tests have been added to validate the new JSON array parsing capabilities, covering schema inference, basic queries, null handling, unnesting, nested structs, compression, and stream cancellation. New internal utilities like ChannelReader facilitate robust async-to-sync data transfer for streaming.
Changelog
  • Cargo.lock
    • Added log dependency.
    • Added serde_json dependency.
    • Added tokio-stream dependency.
    • Added tokio-util dependency.
  • Cargo.toml
    • Added tokio-stream dependency with sync feature.
    • Added tokio-util dependency.
  • datafusion-examples/examples/custom_data_source/csv_json_opener.rs
    • Updated json_opener function call to include the new newline_delimited parameter, setting it to true.
  • datafusion/common/src/config.rs
    • Added newline_delimited: bool field to JsonOptions with a default value of true.
    • Included documentation for the newline_delimited option, explaining both NDJSON and JSON array formats.
  • datafusion/core/src/dataframe/mod.rs
    • Replaced NdJsonReadOptions::default() with JsonReadOptions::default() in read_json example.
  • datafusion/core/src/datasource/file_format/json.rs
    • Removed import of NdJsonReadOptions and added import of JsonReadOptions.
    • Added several test helper functions (create_temp_json, infer_json_array_schema, query_json_array, query_json_array_str) for JSON array format testing.
    • Replaced NdJsonReadOptions with JsonReadOptions in existing tests.
    • Removed outdated comments regarding RequiresMoreData and empty file expectations.
    • Updated JsonFormat documentation to describe support for both line-delimited and array JSON formats, including streaming conversion notes.
    • Added with_newline_delimited and is_newline_delimited methods to JsonFormat.
    • Introduced infer_schema_from_json_array function for streaming schema inference from JSON array format.
    • Modified infer_schema implementation to conditionally use infer_json_schema_from_iterator or infer_schema_from_json_array based on the newline_delimited option.
    • Updated file_source to pass the newline_delimited property to JsonSource.
  • datafusion/core/src/datasource/file_format/options.rs
    • Deprecated NdJsonReadOptions and introduced JsonReadOptions as its replacement, with a type alias for backward compatibility.
    • Added newline_delimited: bool field to JsonReadOptions with a default of true.
    • Added a builder method newline_delimited to JsonReadOptions for setting the format.
    • Updated the ReadOptions trait implementation for JsonReadOptions to pass the newline_delimited flag to the JsonFormat.
  • datafusion/core/src/datasource/listing/table.rs
    • Replaced NdJsonReadOptions::default() with JsonReadOptions::default() in tests.
  • datafusion/core/src/datasource/physical_plan/json.rs
    • Replaced NdJsonReadOptions with JsonReadOptions in imports and test code.
  • datafusion/core/src/execution/context/json.rs
    • Replaced NdJsonReadOptions with JsonReadOptions in imports and function signatures for read_json and register_json.
  • datafusion/core/src/prelude.rs
    • Replaced NdJsonReadOptions with JsonReadOptions in the public exports.
  • datafusion/core/tests/data/json_array.json
    • Added a new test data file containing JSON data in array format.
  • datafusion/core/tests/data/json_empty_array.json
    • Added a new test data file containing an empty JSON array.
  • datafusion/core/tests/dataframe/mod.rs
    • Removed NdJsonReadOptions from imports.
    • Replaced NdJsonReadOptions::default() with JsonReadOptions::default() in tests.
  • datafusion/datasource-json/Cargo.toml
    • Added log = "0.4.29" dependency.
    • Added serde_json = "1.0.149" dependency.
    • Added tokio-stream = { workspace = true, features = ["sync"] } dependency.
  • datafusion/datasource-json/src/file_format.rs
    • Updated module description to include array JSON support.
    • Added Read and Buf imports from std::io and bytes::Buf, Bytes.
    • Updated JsonFormat documentation to reflect support for both formats.
    • Added with_newline_delimited and is_newline_delimited methods to JsonFormat.
    • Implemented infer_schema_from_json_array for streaming schema inference from JSON arrays.
    • Modified infer_schema to use the newline_delimited flag to choose between direct ValueIter and infer_schema_from_json_array.
    • Updated file_source to pass the newline_delimited property to JsonSource.
  • datafusion/datasource-json/src/mod.rs
    • Added pub mod utils; to expose new utility functions.
  • datafusion/datasource-json/src/source.rs
    • Updated module description to include JSON array format support.
    • Added Pin, Context, Poll from std::task, SpawnedTask from datafusion_common_runtime, and RecordBatch from arrow::array.
    • Imported ChannelReader and JsonArrayToNdjsonReader from the new utils module.
    • Defined CHANNEL_BUFFER_SIZE and JSON_CONVERTER_BUFFER_SIZE constants.
    • Introduced JsonArrayStream struct to manage spawned tasks for streaming JSON array processing.
    • Added newline_delimited: bool field to JsonOpener and JsonSource.
    • Modified JsonOpener::new and JsonSource::with_newline_delimited to accept and set the newline_delimited flag.
    • Updated JsonOpener::open to implement logic for both NDJSON and JSON array formats. For JSON arrays, it uses JsonArrayToNdjsonReader and ChannelReader with spawn_blocking for streaming conversion.
    • Added a check in JsonOpener::open to return an error if range-based scanning is attempted on JSON array format files.
    • Added extensive unit tests for JsonOpener covering JSON array reading from files and streams, nested objects, empty arrays, large files, and stream cancellation.
  • datafusion/datasource-json/src/utils.rs
    • New file containing JsonArrayToNdjsonReader and ChannelReader.
    • JsonArrayToNdjsonReader is a streaming reader that converts JSON array format to NDJSON on-the-fly, managing its own input/output buffers and handling JSON parsing state (depth, strings, escapes).
    • ChannelReader is a synchronous Read implementation that receives Bytes chunks from an mpsc::Receiver, bridging async object store streams to synchronous Read interfaces.
    • Includes detailed documentation and unit tests for both utilities, covering various JSON structures, buffer boundaries, and error conditions.
  • datafusion/proto-common/proto/datafusion_common.proto
    • Added optional bool newline_delimited = 4; to the JsonOptions message.
  • datafusion/proto-common/src/from_proto/mod.rs
    • Updated TryFrom<&protobuf::JsonOptions> for JsonOptions to map the newline_delimited field, defaulting to true if not present.
  • datafusion/proto-common/src/generated/pbjson.rs
    • Updated serde::Serialize and serde::Deserialize implementations for JsonOptions to include the newline_delimited field.
  • datafusion/proto-common/src/generated/prost.rs
    • Added newline_delimited field to the JsonOptions struct with #[prost(bool, optional, tag = "4")].
  • datafusion/proto-common/src/to_proto/mod.rs
    • Updated TryFrom<&JsonOptions> for protobuf::JsonOptions to map the newline_delimited field.
  • datafusion/proto/src/generated/datafusion_proto_common.rs
    • Added newline_delimited field to the JsonOptions struct with #[prost(bool, optional, tag = "4")].
  • datafusion/proto/src/logical_plan/file_formats.rs
    • Updated JsonOptionsProto::from(&JsonOptions) and JsonOptions::from(&JsonOptionsProto) implementations to handle the newline_delimited field.
  • datafusion/sqllogictest/test_files/json.slt
    • Added new SQL logic tests for reading JSON array format files.
    • Demonstrated CREATE EXTERNAL TABLE with OPTIONS ('format.newline_delimited' 'false').
    • Included a test case showing that reading a JSON array file without specifying newline_delimited='false' (i.e., using the default NDJSON parser) results in a Not valid JSON error, as expected.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@claude
Copy link

claude bot commented Feb 3, 2026

Comprehensive Review of PR #226: JSON Arrays Reader/Parse Support

Overview

This PR adds support for reading JSON array format [{...}, {...}] in addition to the existing newline-delimited JSON (NDJSON) format. The implementation uses a streaming conversion approach that is memory-efficient for large files.


1. Code Quality and Best Practices ⭐⭐⭐⭐

✅ Strengths

  • Excellent Documentation: Comprehensive inline documentation with ASCII art diagrams explaining architecture and memory management
  • Backward Compatibility: Proper deprecation of NdJsonReadOptions with clear migration path
  • Comprehensive Testing: Extensive test coverage including edge cases, buffer boundaries, and async cancellation
  • Consistent API Design: Clear naming conventions and follows DataFusion patterns

⚠️ Areas for Improvement

1. Error Propagation Issue (datafusion/datasource-json/src/source.rs:1265-1268)

Err(e) => {
    log::error!("Error reading JSON stream: {e}");
    break;
}

Issue: Errors during stream reading are only logged, not propagated to users. This can result in confusing "incomplete JSON array" errors instead of showing the actual IO error.

Recommendation: Send errors through result_tx so users see what actually went wrong.

2. Large Method Complexity (datafusion/datasource-json/src/utils.rs:1825-1901)
The process_byte method is quite large with nested match statements. Consider extracting logic into helper methods like process_byte_in_string() and process_byte_outside_string().

3. Missing Validation (utils.rs:831-858)
The infer_schema_from_json_array function doesn't call validate_complete() after reading. Malformed JSON arrays might pass schema inference but fail during actual reading, leading to confusing errors.


2. Potential Bugs or Issues

🔴 Critical Issues

1. Incomplete Error Propagation (source.rs:1265-1268)
As noted above, IO errors aren't propagated to the parsing task, leading to poor error messages.

2. Hardcoded Buffer Sizes (source.rs:1015-1021)

const CHANNEL_BUFFER_SIZE: usize = 128;
const JSON_CONVERTER_BUFFER_SIZE: usize = 2 * 1024 * 1024;

These are not configurable. For very large objects (e.g., 10MB strings), this could cause issues. Consider making these configurable or adaptive.

⚠️ Medium Priority Issues

1. Depth Counter Type (utils.rs:1740)
Using i32 for depth tracking. Consider using usize which is more idiomatic and avoids potential negative depth issues with malformed JSON.

2. No Timeout for Channel Operations (source.rs:1240-1270)
Blocking operations have no timeout. If producer/consumer hangs, this could cause indefinite blocking.

3. Removed Useful Comments (datafusion/core/src/datasource/file_format/json.rs:180-181)
Comments explaining expected behavior were removed without explanation. These provided useful context for future maintainers.


3. Performance Considerations

✅ Good Decisions

  • Streaming Architecture: Constant memory usage regardless of file size
  • Zero-Copy Where Possible: Uses Bytes type for efficient slicing
  • Parallel Processing: Async read and blocking parse tasks run concurrently

⚠️ Performance Concerns

1. Byte-by-Byte Processing (utils.rs, process_byte)
Processing JSON character-by-character could be 2-3x slower than native NDJSON parsing. Consider chunk-based processing or SIMD for better performance in future optimizations.

2. No Parallel File Scanning (source.rs:1148-1155)
JSON array format cannot be partitioned for parallel reading. This is a fundamental limitation but means large JSON array files will be slower than equivalent NDJSON files.

Recommendation: Document this limitation prominently in user-facing documentation.

3. Double Buffering (utils.rs:1778-1792)
Both input and output buffers exist simultaneously, doubling memory usage. This is necessary for the design but worth noting.


4. Security Concerns 🔒

🔴 High Priority

1. No Denial of Service Protection (utils.rs)
The parser has no limits on:

  • Maximum nesting depth (only limited by i32::MAX)
  • Maximum string length
  • Maximum array size

A malicious JSON file with extreme nesting like [[[[[...10000 levels...]]]] could cause stack overflow or CPU exhaustion.

Recommendation: Add configurable limits:

const MAX_NESTING_DEPTH: i32 = 64;
const MAX_STRING_LENGTH: usize = 100 * 1024 * 1024; // 100MB

2. Unbounded Memory Allocation (source.rs:1273-1299)
The blocking task has no memory limits. A malicious JSON array could cause excessive memory allocation.

Recommendation: Integrate with DataFusion's memory manager to add pressure checks.

⚠️ Medium Priority

1. No Validation of Trailing Content
While code detects trailing content after ], validation isn't called in the main read path, so malformed files might be partially processed without error.

✅ Security Strengths

  • Proper string escape handling
  • No unsafe code blocks
  • Safe Rust throughout

5. Test Coverage ✅

Excellent Coverage Includes:

  • Unit tests for conversion logic, edge cases, and buffer boundaries
  • Integration tests with schema inference, queries, compression
  • SQL logic tests for external tables and error scenarios
  • Stream cancellation tests for async safety

Missing Test Coverage:

  1. Concurrent access: Multiple queries reading the same file
  2. Very large individual objects: 100MB objects in the array
  3. Malicious input:
    • Deeply nested JSON (1000+ levels)
    • Very long strings (1GB)
    • Invalid UTF-8
  4. Error recovery: Resource cleanup after mid-stream failures
  5. Benchmarks: Performance comparison between NDJSON and JSON array formats

6. API Design and Breaking Changes

✅ Good API Design

  • Non-Breaking: Defaults maintain backward compatibility
  • Clear deprecation path for NdJsonReadOptions
  • Consistent with DataFusion patterns

⚠️ API Concerns

1. Deprecation Timeline: The PR deprecates NdJsonReadOptions but doesn't specify a removal version. Please add a clear timeline.

2. Configuration Inconsistency (config.rs:73)
There's both a session-level newline_delimited config and a format-specific option. The precedence between these is unclear.

3. Breaking Change in Internal API: JsonOpener::new() now requires an additional parameter. This breaks direct construction but appears to be internal-only.


Summary and Recommendations

Critical Actions Required ⚠️

  1. Fix error propagation in async read task to show actual IO errors
  2. Add DoS protection with max nesting depth and size limits
  3. Add validation in schema inference to catch malformed JSON early

High Priority Actions

  1. Add tests for malicious/pathological input (deep nesting, huge strings)
  2. Document performance implications of JSON array vs NDJSON
  3. Clarify config precedence between session-level and format-level options

Medium Priority Actions

  1. Add benchmarks comparing NDJSON and JSON array performance
  2. Use usize for depth counter instead of i32
  3. Add timeout handling for channel operations
  4. Restore removed comments or document why they were removed
  5. Specify deprecation timeline for NdJsonReadOptions

Overall Assessment

Quality: ⭐⭐⭐⭐ (4/5)

This is a well-implemented feature with excellent test coverage and documentation. The streaming architecture is appropriate for large files, and the backward-compatible API design is commendable.

Recommendation: Approve with conditions - Please address the critical issues (error propagation, DoS protection, validation in schema inference) before merging. The high-priority items can be addressed in follow-up PRs if time is a constraint.

Great work on this feature! The implementation shows careful consideration of memory efficiency and backward compatibility. 🎉

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds support for reading JSON array format in DataFusion, in addition to the existing newline-delimited JSON (NDJSON) support. This is a significant feature enhancement. The implementation includes a streaming parser for JSON arrays, which is memory-efficient. The NdJsonReadOptions has been renamed to JsonReadOptions to reflect this change, with a deprecated type alias for backward compatibility. The changes are well-tested with a comprehensive new test suite. My review focuses on the correctness and maintainability of the new implementation. I've found one high-severity issue related to error propagation and one medium-severity issue regarding code duplication.

Comment on lines +381 to +384
Err(e) => {
log::error!("Error reading JSON stream: {e}");
break;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When an error occurs while reading from the object store stream, it is logged, but not propagated to the consumer of the RecordBatch stream. The stream will simply end, or the parser might fail with a potentially misleading JSON parsing error. The underlying I/O error should be propagated to the output stream so the user is aware of the root cause.

You can fix this by cloning the result_tx sender before spawning the tasks and using it here to send the error. For example:

// Before spawning tasks
let (result_tx, result_rx) = tokio::sync::mpsc::channel(2);
let result_tx_clone = result_tx.clone();

Then use result_tx_clone in the read_task as suggested.

                                    Err(e) => {
                                        log::error!("Error reading JSON stream: {e}");
                                        // Propagate the error to the output stream.
                                        let arrow_error = arrow::error::ArrowError::ExternalError(Box::new(e));
                                        if result_tx_clone.send(Err(arrow_error)).await.is_err() {
                                            // Receiver has been dropped, which is fine.
                                        }
                                        break;
                                    }

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Gemini AI reviewer is correct! JSON array parsing errors are currently only logged but the caller is not informed about the error. This way the usage of the partially parsed data will lead to another error. Prevents hiding the error until someone tries to use the problematic data.

Comment on lines +286 to 332
let (schema, records_consumed) = match r.payload {
#[cfg(not(target_arch = "wasm32"))]
GetResultPayload::File(file, _) => {
let decoder = file_compression_type.convert_read(file)?;
let mut reader = BufReader::new(decoder);
let iter = ValueIter::new(&mut reader, None);
infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
let reader = BufReader::new(decoder);

if newline_delimited {
// NDJSON: use ValueIter directly
let iter = ValueIter::new(reader, None);
let mut count = 0;
let schema =
infer_json_schema_from_iterator(iter.take_while(|_| {
let should_take = count < records_to_read;
if should_take {
count += 1;
}
should_take
}))?;
(schema, count)
} else {
// JSON array format: use streaming converter
infer_schema_from_json_array(reader, records_to_read)?
}
}
GetResultPayload::Stream(_) => {
let data = r.bytes().await?;
let decoder = file_compression_type.convert_read(data.reader())?;
let mut reader = BufReader::new(decoder);
let iter = ValueIter::new(&mut reader, None);
infer_json_schema_from_iterator(iter.take_while(|_| take_while()))?
let reader = BufReader::new(decoder);

if newline_delimited {
let iter = ValueIter::new(reader, None);
let mut count = 0;
let schema =
infer_json_schema_from_iterator(iter.take_while(|_| {
let should_take = count < records_to_read;
if should_take {
count += 1;
}
should_take
}))?;
(schema, count)
} else {
// JSON array format: use streaming converter
infer_schema_from_json_array(reader, records_to_read)?
}
}
};

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for inferring the schema for newline-delimited JSON is duplicated between the GetResultPayload::File and GetResultPayload::Stream match arms. This can be refactored into a helper function to improve maintainability and reduce code duplication. The helper function could take a generic Read trait object.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Gemini AI reviewer is correct! There is some duplicated code in the branches for processing files and streams from the ObjectStore that could be extracted into a helper function to prevent double maintenance and possible diversion

@augmentcode
Copy link

augmentcode bot commented Feb 3, 2026

🤖 Augment PR Summary

Summary: This PR extends DataFusion’s JSON reader to support both newline-delimited JSON (NDJSON) and JSON array files.

Changes:

  • Introduced a new newline_delimited JSON option (default true) in config, table options, and protobuf serialization.
  • Added JsonReadOptions and deprecated NdJsonReadOptions as an alias to maintain API continuity.
  • Updated JSON FileFormat schema inference to handle JSON arrays by converting them to NDJSON on-the-fly.
  • Propagated the format flag through listing/scan plumbing so execution uses the correct reader behavior.
  • Implemented streaming conversion for JSON array payloads via a new JsonArrayToNdjsonReader (plus a ChannelReader bridge for async object-store streams).
  • Added execution safeguards (JSON array does not support range-based scanning) with an explicit error message.
  • Updated examples and internal call sites to use JsonReadOptions.
  • Added comprehensive unit/integration coverage (format tests, datasource-json tests, and sqllogictest cases) and new JSON-array test data files.

Technical Notes: JSON array files are parsed by streaming conversion to NDJSON to avoid loading full arrays into memory, while NDJSON remains the default behavior.

🤖 Was this summary useful? React with 👍 or 👎

Copy link

@augmentcode augmentcode bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Review completed. 3 suggestions posted.

Fix All in Augment

Comment augment review to trigger a new review at any time.

if self.output_pos >= self.output_filled {
self.fill_output_buffer()?;
if self.output_filled == 0 {
return Ok(0); // EOF
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At EOF this returns Ok(0) even if the input JSON array was never closed or had trailing non-whitespace content, which can make malformed JSON silently parse as valid NDJSON. Since validate_complete() already tracks this, consider surfacing an InvalidData error on EOF when state != Done or has_trailing_content is true.

Severity: medium

Other Locations
  • datafusion/datasource-json/src/source.rs:316
  • datafusion/datasource-json/src/source.rs:394
  • datafusion/datasource-json/src/file_format.rs:226

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! Every usage of JsonArrayToNdjsonReader to read JSON array file/stream should be followed by calling validate_complete() to make sure that the content is well formed JSON array and there are not any problems with it. Prevents reading/parsing JSON that is not following the JSON specification.

while let Some(chunk) = decompressed_stream.next().await {
match chunk {
Ok(bytes) => {
if byte_tx.send(bytes).is_err() {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This std::sync::mpsc::sync_channel send call blocks the async task when the buffer is full, which can stall a Tokio worker thread under backpressure. Consider using an async channel (tokio::sync::mpsc) or moving the blocking send into a blocking context.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

}
}
Err(e) => {
log::error!("Error reading JSON stream: {e}");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On read/decompression errors this path only logs and then ends the stream, which can look like a clean EOF and hide the real failure cause. Consider propagating the error to the consumer (e.g., via the result_tx channel) so queries fail with the underlying error.

Severity: medium

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎, or 🚀 if it prevented an incident/outage.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! JSON array parsing errors are currently only logged but the caller is not informed about the error. This way the usage of the partially parsed data will lead to another error. Prevents hiding the error until someone tries to use the problematic data.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.

Err(e) => {
log::error!("Error reading JSON stream: {e}");
break;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stream errors silently swallowed causing partial results

High Severity

When streaming JSON array format from remote storage (S3, etc.), errors from the object store stream are only logged and then the task breaks, but the error is never propagated to the consumer. When byte_tx is dropped, ChannelReader returns EOF (0 bytes), causing parse_task to complete normally with whatever partial data was processed. This results in queries silently returning incomplete results when network errors occur, rather than failing with an error. The read_task has no way to communicate errors to the consumer since result_tx is only accessible from parse_task.

Fix in Cursor Fix in Web

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Bugbot AI reviewer is correct! JSON array parsing errors are currently only logged but the caller is not informed about the error. This way the usage of the partially parsed data will lead to another error. Prevents hiding the error until someone tries to use the problematic data.

}
should_take
}))?;
(schema, count)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicated NDJSON schema inference logic

Medium Severity

The NDJSON schema inference logic is duplicated between the File branch (lines 292-304) and Stream branch (lines 315-326) in infer_schema(). Both contain identical code using ValueIter, infer_json_schema_from_iterator, and record counting. A helper function like infer_schema_from_ndjson(reader, max_records) would eliminate this duplication, consistent with the existing infer_schema_from_json_array helper.

Fix in Cursor Fix in Web

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:good-to-have; category:bug; feedback: The Bugbot AI reviewer is correct! There is some duplicated code in the branches for processing files and streams from the ObjectStore that could be extracted into a helper function to prevent double maintenance and possible diversion

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@datafusion/datasource-json/src/source.rs`:
- Around line 370-388: The read task spawned by SpawnedTask::spawn currently
logs read errors and breaks, which hides failures from the consumer; change the
loop handling for decompressed_stream in the spawned task to send any Err(e)
into the existing error channel (error_tx) instead of only logging, then break
so the byte channel closes; ensure the surrounding code that builds the consumer
stream (e.g., ChannelReader / result stream) reads from error_rx and returns or
yields the error to callers so consumers see the read failure instead of a
silent EOF.

In `@datafusion/datasource-json/src/utils.rs`:
- Around line 109-133: The reader currently silently discards any bytes before
the opening '['; modify JsonArrayToNdjsonReader to detect and reject
non‑whitespace bytes that appear before the first '[' by (1) adding a boolean
flag (e.g., leading_non_whitespace) or reusing/augmenting an existing state to
record if any non‑whitespace was seen while in the pre‑array state, (2) set that
flag when scanning bytes before the initial '[' if a byte is not ASCII
whitespace, and (3) update validate_complete() to return an InvalidData Err when
the flag is set (i.e., trailing/preceding junk was observed) so inputs like
"junk[...]" are rejected; ensure the detection happens in the same code paths
that currently skip until '[' (refer to JsonArrayToNdjsonReader, its pre‑array
handling code and validate_complete()) and that tests covering leading junk are
added/updated.
- Around line 162-196: The JsonArrayToNdjsonReader instances are never validated
at the end of consumption, so malformed arrays slip through; call
JsonArrayToNdjsonReader::validate_complete() on each production path that
constructs and hands the reader off (i.e., the paths that pass the reader into
the Arrow ReaderBuilder and the path that wraps it with ValueIter) and propagate
any std::io::Error returned. Concretely: after the reader has been fully
consumed (or immediately before passing it to ReaderBuilder or constructing
ValueIter), invoke reader.validate_complete()? and return or map the error into
the surrounding Result so missing closing ']', unterminated strings, or trailing
content are reported. Ensure this is added to the buffered JSON array path, the
streaming JSON array path, and the schema inference path that uses ValueIter.
🧹 Nitpick comments (3)
datafusion/datasource-json/Cargo.toml (1)

46-50: Align log/serde_json to workspace dependencies.

Using explicit versions here can drift from workspace policy and create version skew. Prefer workspace = true for consistency.

🔧 Suggested change
-log = "0.4.29"
+log = { workspace = true }
 ...
-serde_json = "1.0.149"
+serde_json = { workspace = true }
datafusion/core/src/datasource/file_format/json.rs (1)

59-64: Use PathBuf::join for cross-platform temp paths.
String concatenation with "/" is less portable than Path::join.

Suggested change
-        let path = format!("{}/test.json", tmp_dir.path().to_string_lossy());
-        std::fs::write(&path, content).unwrap();
-        (tmp_dir, path)
+        let path = tmp_dir.path().join("test.json");
+        std::fs::write(&path, content).unwrap();
+        (tmp_dir, path.to_string_lossy().to_string())
datafusion/datasource-json/src/file_format.rs (1)

286-331: Consider extracting duplicated NDJSON schema inference logic.

The NDJSON inference code in the File branch (lines 292-304) and Stream branch (lines 315-326) is nearly identical. Consider extracting it into a helper function similar to infer_schema_from_json_array:

fn infer_schema_from_ndjson<R: Read>(reader: R, max_records: usize) -> Result<(Schema, usize)> {
    let iter = ValueIter::new(reader, None);
    let mut count = 0;
    let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
        let should_take = count < max_records;
        if should_take {
            count += 1;
        }
        should_take
    }))?;
    Ok((schema, count))
}

This would reduce code duplication and improve maintainability.

Comment on lines +370 to +388
// Async task: read from object store stream and send bytes to channel
// Store the SpawnedTask to keep it alive until stream is dropped
let read_task = SpawnedTask::spawn(async move {
tokio::pin!(decompressed_stream);
while let Some(chunk) = decompressed_stream.next().await {
match chunk {
Ok(bytes) => {
if byte_tx.send(bytes).is_err() {
break; // Consumer dropped
}
}
Err(e) => {
log::error!("Error reading JSON stream: {e}");
break;
}
}
}
// byte_tx dropped here, signals EOF to ChannelReader
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Error in read task is silently dropped.

When an error occurs reading from the object store stream (line 381-384), the error is only logged and the task breaks. The consumer will see the stream end without knowing an error occurred. Consider propagating the error:

🛡️ Proposed fix to propagate read errors
-                        // Async task: read from object store stream and send bytes to channel
-                        // Store the SpawnedTask to keep it alive until stream is dropped
-                        let read_task = SpawnedTask::spawn(async move {
+                        // Channel for read errors
+                        let (error_tx, mut error_rx) = tokio::sync::oneshot::channel();
+                        
+                        let read_task = SpawnedTask::spawn(async move {
                             tokio::pin!(decompressed_stream);
                             while let Some(chunk) = decompressed_stream.next().await {
                                 match chunk {
                                     Ok(bytes) => {
                                         if byte_tx.send(bytes).is_err() {
                                             break; // Consumer dropped
                                         }
                                     }
                                     Err(e) => {
                                         log::error!("Error reading JSON stream: {e}");
+                                        let _ = error_tx.send(e);
                                         break;
                                     }
                                 }
                             }
                         });

Then check error_rx in the result stream to surface the error to the consumer.

🤖 Prompt for AI Agents
In `@datafusion/datasource-json/src/source.rs` around lines 370 - 388, The read
task spawned by SpawnedTask::spawn currently logs read errors and breaks, which
hides failures from the consumer; change the loop handling for
decompressed_stream in the spawned task to send any Err(e) into the existing
error channel (error_tx) instead of only logging, then break so the byte channel
closes; ensure the surrounding code that builds the consumer stream (e.g.,
ChannelReader / result stream) reads from error_rx and returns or yields the
error to callers so consumers see the read failure instead of a silent EOF.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! JSON array parsing errors are currently only logged but the caller is not informed about the error. This way the usage of the partially parsed data will lead to another error. Prevents hiding the error until someone tries to use the problematic data.

Comment on lines +109 to +133
pub struct JsonArrayToNdjsonReader<R: Read> {
/// Inner reader - we use R directly (not `BufReader<R>`) for SyncIoBridge compatibility
inner: R,
state: JsonArrayState,
/// Tracks nesting depth of `{` and `[` to identify top-level commas
depth: i32,
/// Whether we're currently inside a JSON string
in_string: bool,
/// Whether the next character is escaped (after `\`)
escape_next: bool,
/// Input buffer - stores raw bytes read from inner reader
input_buffer: Vec<u8>,
/// Current read position in input buffer
input_pos: usize,
/// Number of valid bytes in input buffer
input_filled: usize,
/// Output buffer - stores transformed NDJSON bytes
output_buffer: Vec<u8>,
/// Current read position in output buffer
output_pos: usize,
/// Number of valid bytes in output buffer
output_filled: usize,
/// Whether trailing non-whitespace content was detected after ']'
has_trailing_content: bool,
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reject non‑whitespace before the opening '['.

Line 205 currently ignores any byte that isn’t [, even if it’s non‑whitespace. That means inputs like junk[{"a":1}] are silently accepted and the leading junk discarded, producing valid NDJSON from invalid JSON. Consider tracking leading non‑whitespace and surfacing InvalidData in validate_complete().

🔧 Suggested fix
 pub struct JsonArrayToNdjsonReader<R: Read> {
     /// Whether trailing non-whitespace content was detected after ']'
     has_trailing_content: bool,
+    /// Whether non-whitespace content was detected before '['
+    has_leading_content: bool,
 }
@@
     pub fn with_capacity(reader: R, capacity: usize) -> Self {
         Self {
@@
             has_trailing_content: false,
+            has_leading_content: false,
         }
     }
@@
     pub fn validate_complete(&self) -> std::io::Result<()> {
+        if self.has_leading_content {
+            return Err(std::io::Error::new(
+                std::io::ErrorKind::InvalidData,
+                "Malformed JSON array: unexpected content before '['",
+            ));
+        }
         if self.depth != 0 {
@@
     fn process_byte(&mut self, byte: u8) -> Option<u8> {
         match self.state {
             JsonArrayState::Start => {
                 // Looking for the opening '[', skip whitespace
                 if byte == b'[' {
                     self.state = JsonArrayState::InArray;
-                }
-                // Skip whitespace and the '[' itself
+                    return None;
+                }
+                // Track non-whitespace before '[' so we can reject invalid input
+                if !byte.is_ascii_whitespace() {
+                    self.has_leading_content = true;
+                }
                 None
             }

Also applies to: 145-159, 162-196, 199-210

🤖 Prompt for AI Agents
In `@datafusion/datasource-json/src/utils.rs` around lines 109 - 133, The reader
currently silently discards any bytes before the opening '['; modify
JsonArrayToNdjsonReader to detect and reject non‑whitespace bytes that appear
before the first '[' by (1) adding a boolean flag (e.g., leading_non_whitespace)
or reusing/augmenting an existing state to record if any non‑whitespace was seen
while in the pre‑array state, (2) set that flag when scanning bytes before the
initial '[' if a byte is not ASCII whitespace, and (3) update
validate_complete() to return an InvalidData Err when the flag is set (i.e.,
trailing/preceding junk was observed) so inputs like "junk[...]" are rejected;
ensure the detection happens in the same code paths that currently skip until
'[' (refer to JsonArrayToNdjsonReader, its pre‑array handling code and
validate_complete()) and that tests covering leading junk are added/updated.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! The JSON array reader/parser checks whether there is a trailing content after the closing ']' character but it does not do the same for leading content before the opening '[' character. Prevents reading invalid JSON array data.

Comment on lines +162 to +196
/// Check if the JSON array was properly terminated.
///
/// This should be called after all data has been read.
///
/// Returns an error if:
/// - Unbalanced braces/brackets (depth != 0)
/// - Unterminated string
/// - Missing closing `]`
/// - Unexpected trailing content after `]`
pub fn validate_complete(&self) -> std::io::Result<()> {
if self.depth != 0 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Malformed JSON array: unbalanced braces or brackets",
));
}
if self.in_string {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Malformed JSON array: unterminated string",
));
}
if self.state != JsonArrayState::Done {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Incomplete JSON array: expected closing bracket ']'",
));
}
if self.has_trailing_content {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Malformed JSON: unexpected trailing content after ']'",
));
}
Ok(())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# Find construction and validation call sites for JsonArrayToNdjsonReader
rg -n "JsonArrayToNdjsonReader" -S
rg -n "validate_complete" -S

Repository: martin-augment/datafusion

Length of output: 4602


🏁 Script executed:

# Check source.rs around the reader construction sites
sed -n '310,370p' datafusion/datasource-json/src/source.rs

# Check file_format.rs around the reader construction site
sed -n '220,250p' datafusion/datasource-json/src/file_format.rs

Repository: martin-augment/datafusion

Length of output: 4108


🏁 Script executed:

# Look at the full implementation of JsonArrayToNdjsonReader
sed -n '109,200p' datafusion/datasource-json/src/utils.rs

# Also check the Read and BufRead trait implementations
sed -n '326,360p' datafusion/datasource-json/src/utils.rs

Repository: martin-augment/datafusion

Length of output: 4588


🏁 Script executed:

# Search for Drop implementation and any post-processing validation
sed -n '200,330p' datafusion/datasource-json/src/utils.rs

# Check how source.rs actually uses the arrow_reader after building
sed -n '310,330p' datafusion/datasource-json/src/source.rs

Repository: martin-augment/datafusion

Length of output: 5925


🏁 Script executed:

# Search for Drop implementation
rg "impl.*Drop.*JsonArrayToNdjsonReader" -A 20

# Also check if the issue is addressed in tests with invalid data
sed -n '566,595p' datafusion/datasource-json/src/utils.rs

Repository: martin-augment/datafusion

Length of output: 1362


Add validate_complete() call to all production code paths using JsonArrayToNdjsonReader.

Incomplete or malformed JSON arrays silently pass validation when validate_complete() is not called. The following production paths construct JsonArrayToNdjsonReader but never validate:

  • source.rs:316–320: JSON array buffered read path passes reader to Arrow ReaderBuilder without validation
  • source.rs:394: JSON array streaming path passes reader to Arrow ReaderBuilder without validation
  • file_format.rs:226: Schema inference path passes reader to ValueIter without validation

Missing closing ], unterminated strings, and trailing junk are only detected by validate_complete(). Without explicit calls after the reader is exhausted, invalid input completes successfully.

🤖 Prompt for AI Agents
In `@datafusion/datasource-json/src/utils.rs` around lines 162 - 196, The
JsonArrayToNdjsonReader instances are never validated at the end of consumption,
so malformed arrays slip through; call
JsonArrayToNdjsonReader::validate_complete() on each production path that
constructs and hands the reader off (i.e., the paths that pass the reader into
the Arrow ReaderBuilder and the path that wraps it with ValueIter) and propagate
any std::io::Error returned. Concretely: after the reader has been fully
consumed (or immediately before passing it to ReaderBuilder or constructing
ValueIter), invoke reader.validate_complete()? and return or map the error into
the surrounding Result so missing closing ']', unterminated strings, or trailing
content are reported. Ensure this is added to the buffered JSON array path, the
streaming JSON array path, and the schema inference path that uses ValueIter.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! Every usage of JsonArrayToNdjsonReader to read JSON array file/stream should be followed by calling validate_complete() to make sure that the content is well formed JSON array and there are not any problems with it. Prevents reading/parsing JSON that is not following the JSON specification.

@martin-augment
Copy link
Owner Author

1. No Validation of Trailing Content
While code detects trailing content after ], validation isn't called in the main read path, so malformed files might be partially processed without error.

value:useful; category:bug; feedback: The Claude AI reviewer is correct! Every usage of JsonArrayToNdjsonReader to read JSON array file/stream should be followed by calling validate_complete() to make sure that the content is well formed JSON array and there are not any problems with it. Prevents reading/parsing JSON that is not following the JSON specification.

@martin-augment
Copy link
Owner Author

3. Missing Validation (utils.rs:831-858)
The infer_schema_from_json_array function doesn't call validate_complete() after reading. Malformed JSON arrays might pass schema inference but fail during actual reading, leading to confusing errors.

value:useful; category:bug; feedback: The Claude AI reviewer is correct! Every usage of JsonArrayToNdjsonReader to read JSON array file/stream should be followed by calling validate_complete() to make sure that the content is well formed JSON array and there are not any problems with it. Prevents reading/parsing JSON that is not following the JSON specification.

@martin-augment
Copy link
Owner Author

286-331: Consider extracting duplicated NDJSON schema inference logic.

The NDJSON inference code in the File branch (lines 292-304) and Stream branch (lines 315-326) is nearly identical. Consider extracting it into a helper function similar to infer_schema_from_json_array:

fn infer_schema_from_ndjson<R: Read>(reader: R, max_records: usize) -> Result<(Schema, usize)> {
    let iter = ValueIter::new(reader, None);
    let mut count = 0;
    let schema = infer_json_schema_from_iterator(iter.take_while(|_| {
        let should_take = count < max_records;
        if should_take {
            count += 1;
        }
        should_take
    }))?;
    Ok((schema, count))
}

This would reduce code duplication and improve maintainability.

value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is correct! There is some duplicated code in the branches for processing files and streams from the ObjectStore that could be extracted into a helper function to prevent double maintenance and possible diversion

@martin-augment
Copy link
Owner Author

6-50: Align log/serde_json to workspace dependencies.

Using explicit versions here can drift from workspace policy and create version skew. Prefer workspace = true for consistency.

value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is correct! By using workspace=true the crate will use the same version of the dependencies as all other crates in the workspace. Prevents using different versions of the log and serde_json crates in one of the project crates

@martin-augment
Copy link
Owner Author

59-64: Use PathBuf::join for cross-platform temp paths. String concatenation with "/" is less portable than Path::join.

value:good-to-have; category:bug; feedback: The CodeRabbit AI reviewer is correct! Using '/' as a file separator will fail on Windows. It is better to use PathBuf::join() instead, because it knows what to do depending on the runtime OS.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants