Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async fn json_opener() -> Result<()> {
projected,
FileCompressionType::UNCOMPRESSED,
Arc::new(object_store),
true,
);

let scan_config = FileScanConfigBuilder::new(
Expand Down
19 changes: 19 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3065,6 +3065,25 @@ config_namespace! {
/// If not specified, the default level for the compression algorithm is used.
pub compression_level: Option<u32>, default = None
pub schema_infer_max_rec: Option<usize>, default = None
/// The JSON format to use when reading files.
///
/// When `true` (default), expects newline-delimited JSON (NDJSON):
/// ```text
/// {"key1": 1, "key2": "val"}
/// {"key1": 2, "key2": "vals"}
/// ```
///
/// When `false`, expects JSON array format:
/// ```text
/// [
/// {"key1": 1, "key2": "val"},
/// {"key1": 2, "key2": "vals"}
/// ]
/// ```
///
/// Note: JSON array format requires loading the entire file into memory.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs state JSON array format "requires loading the entire file into memory", but the implementation uses streaming conversion for GetResultPayload::File paths; this comment may be misleading for users trying to choose between formats.

Fix This in Augment

🤖 Was this useful? React with 👍 or 👎

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Augment AI reviewer is correct! The new version of the proposed changes still loads the complete JSON array file in the memory for GetResultPayload::Stream and this may lead to out of memory errors for huge files. The file is loaded as a stream from the ObjectStore, so it should be processed as a stream. Prevents out of memory failures when loading a big JSON array file.

/// For large files, newline-delimited format is recommended.
Comment on lines +3084 to +3085

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The comment Note: JSON array format requires loading the entire file into memory. is a bit misleading. The implementation uses a streaming converter (JsonArrayToNdjsonReader) for file-based sources, which is memory-efficient. While it's true that for Stream payloads the entire stream is buffered into memory first, for file-based sources (a very common case), it is streaming.

Consider rephrasing to be more precise. For example, you could mention that streaming sources might be buffered. The documentation in datafusion/datasource-json/src/file_format.rs is more accurate on this point: Note: JSON array format is processed using streaming conversion, which is memory-efficient even for large files.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The Gemini AI reviewer is correct! The new version of the proposed changes still loads the complete JSON array file in the memory for GetResultPayload::Stream and this may lead to out of memory errors for huge files. The file is loaded as a stream from the ObjectStore, so it should be processed as a stream. Prevents out of memory failures when loading a big JSON array file.

pub newline_delimited: bool, default = true
Comment on lines +3068 to +3086
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Clarify the JSON array memory note.

The comment says JSON array format requires loading the entire file into memory. If the new JsonArrayToNdjsonReader streams, this wording is misleading. Consider softening to “may be more memory‑intensive than NDJSON” or align it with the actual behavior.

✏️ Suggested doc tweak
-        /// Note: JSON array format requires loading the entire file into memory.
-        /// For large files, newline-delimited format is recommended.
+        /// Note: JSON array parsing can be more memory-intensive than NDJSON.
+        /// For large files, NDJSON is recommended.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// The JSON format to use when reading files.
///
/// When `true` (default), expects newline-delimited JSON (NDJSON):
/// ```text
/// {"key1": 1, "key2": "val"}
/// {"key1": 2, "key2": "vals"}
/// ```
///
/// When `false`, expects JSON array format:
/// ```text
/// [
/// {"key1": 1, "key2": "val"},
/// {"key1": 2, "key2": "vals"}
/// ]
/// ```
///
/// Note: JSON array format requires loading the entire file into memory.
/// For large files, newline-delimited format is recommended.
pub newline_delimited: bool, default = true
/// The JSON format to use when reading files.
///
/// When `true` (default), expects newline-delimited JSON (NDJSON):
///
🤖 Prompt for AI Agents
In `@datafusion/common/src/config.rs` around lines 3068 - 3086, The doc comment
for the config field newline_delimited misleadingly states that JSON array
format "requires loading the entire file into memory"; update the comment to
reflect actual behavior by softening this claim and referencing
JsonArrayToNdjsonReader: change the note to say JSON array format "can be more
memory‑intensive than NDJSON and may require buffering depending on the reader
implementation (e.g., JsonArrayToNdjsonReader streams arrays to NDJSON to reduce
memory use)" or similar, so readers understand that streaming mitigates the
memory requirement while NDJSON is generally more memory‑efficient for very
large files; edit the comment attached to pub newline_delimited: bool, default =
true accordingly.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value:useful; category:bug; feedback: The CodeRabbit AI reviewer is correct! The new version of the proposed changes still loads the complete JSON array file in the memory and this may lead to out of memory errors for huge files. The file is loaded as a stream from the ObjectStore, so it should be processed as a stream. Prevents out of memory failures when loading a big JSON array file.

}
}

Expand Down
246 changes: 243 additions & 3 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,54 @@ mod tests {
use datafusion_common::stats::Precision;

use datafusion_common::Result;
use datafusion_datasource::file_compression_type::FileCompressionType;
use futures::StreamExt;
use insta::assert_snapshot;
use object_store::local::LocalFileSystem;
use regex::Regex;
use rstest::rstest;

// ==================== Test Helpers ====================

/// Create a temporary JSON file and return (TempDir, path)
fn create_temp_json(content: &str) -> (tempfile::TempDir, String) {
let tmp_dir = tempfile::TempDir::new().unwrap();
let path = format!("{}/test.json", tmp_dir.path().to_string_lossy());
std::fs::write(&path, content).unwrap();
(tmp_dir, path)
}

/// Infer schema from JSON array format file
async fn infer_json_array_schema(
content: &str,
) -> Result<arrow::datatypes::SchemaRef> {
let (_tmp_dir, path) = create_temp_json(content);
let session = SessionContext::new();
let ctx = session.state();
let store = Arc::new(LocalFileSystem::new()) as _;
let format = JsonFormat::default().with_newline_delimited(false);
format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await
}

/// Register a JSON array table and run a query
async fn query_json_array(content: &str, query: &str) -> Result<Vec<RecordBatch>> {
let (_tmp_dir, path) = create_temp_json(content);
let ctx = SessionContext::new();
let options = NdJsonReadOptions::default().newline_delimited(false);
ctx.register_json("test_table", &path, options).await?;
ctx.sql(query).await?.collect().await
}

/// Register a JSON array table and run a query, return formatted string
async fn query_json_array_str(content: &str, query: &str) -> Result<String> {
let result = query_json_array(content, query).await?;
Ok(batches_to_string(&result))
}

// ==================== Existing Tests ====================

#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
Expand Down Expand Up @@ -314,7 +356,6 @@ mod tests {
.digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 }"#.into());

let mut all_batches = RecordBatch::new_empty(schema.clone());
// We get RequiresMoreData after 2 batches because of how json::Decoder works
for _ in 0..2 {
let output = deserializer.next()?;
let DeserializerOutput::RecordBatch(batch) = output else {
Expand Down Expand Up @@ -358,7 +399,6 @@ mod tests {
let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist and be empty
assert!(std::path::Path::new(&path).exists());
let metadata = std::fs::metadata(&path)?;
assert_eq!(metadata.len(), 0);
Expand All @@ -385,10 +425,210 @@ mod tests {
let df = ctx.read_batch(empty_batch.clone())?;
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist and be empty
assert!(std::path::Path::new(&path).exists());
let metadata = std::fs::metadata(&path)?;
assert_eq!(metadata.len(), 0);
Ok(())
}

// ==================== JSON Array Format Tests ====================

#[tokio::test]
async fn test_json_array_schema_inference() -> Result<()> {
let schema = infer_json_array_schema(
r#"[{"a": 1, "b": 2.0, "c": true}, {"a": 2, "b": 3.5, "c": false}]"#,
)
.await?;

let fields: Vec<_> = schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect();
assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
Ok(())
}

#[tokio::test]
async fn test_json_array_empty() -> Result<()> {
let schema = infer_json_array_schema("[]").await?;
assert_eq!(schema.fields().len(), 0);
Ok(())
}

#[tokio::test]
async fn test_json_array_nested_struct() -> Result<()> {
let schema = infer_json_array_schema(
r#"[{"id": 1, "info": {"name": "Alice", "age": 30}}]"#,
)
.await?;

let info_field = schema.field_with_name("info").unwrap();
assert!(matches!(info_field.data_type(), DataType::Struct(_)));
Ok(())
}

#[tokio::test]
async fn test_json_array_list_type() -> Result<()> {
let schema =
infer_json_array_schema(r#"[{"id": 1, "tags": ["a", "b", "c"]}]"#).await?;

let tags_field = schema.field_with_name("tags").unwrap();
assert!(matches!(tags_field.data_type(), DataType::List(_)));
Ok(())
}

#[tokio::test]
async fn test_json_array_basic_query() -> Result<()> {
let result = query_json_array_str(
r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}, {"a": 3, "b": "test"}]"#,
"SELECT a, b FROM test_table ORDER BY a",
)
.await?;

assert_snapshot!(result, @r"
+---+-------+
| a | b |
+---+-------+
| 1 | hello |
| 2 | world |
| 3 | test |
+---+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_with_nulls() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "name": "Alice"}, {"id": 2, "name": null}, {"id": 3, "name": "Charlie"}]"#,
"SELECT id, name FROM test_table ORDER BY id",
)
.await?;

assert_snapshot!(result, @r"
+----+---------+
| id | name |
+----+---------+
| 1 | Alice |
| 2 | |
| 3 | Charlie |
+----+---------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_unnest() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "values": [10, 20, 30]}, {"id": 2, "values": [40, 50]}]"#,
"SELECT id, unnest(values) as value FROM test_table ORDER BY id, value",
)
.await?;

assert_snapshot!(result, @r"
+----+-------+
| id | value |
+----+-------+
| 1 | 10 |
| 1 | 20 |
| 1 | 30 |
| 2 | 40 |
| 2 | 50 |
+----+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_unnest_struct() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "orders": [{"product": "A", "qty": 2}, {"product": "B", "qty": 3}]}, {"id": 2, "orders": [{"product": "C", "qty": 1}]}]"#,
"SELECT id, unnest(orders)['product'] as product, unnest(orders)['qty'] as qty FROM test_table ORDER BY id, product",
)
.await?;

assert_snapshot!(result, @r"
+----+---------+-----+
| id | product | qty |
+----+---------+-----+
| 1 | A | 2 |
| 1 | B | 3 |
| 2 | C | 1 |
+----+---------+-----+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_nested_struct_access() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "dept": {"name": "Engineering", "head": "Alice"}}, {"id": 2, "dept": {"name": "Sales", "head": "Bob"}}]"#,
"SELECT id, dept['name'] as dept_name, dept['head'] as head FROM test_table ORDER BY id",
)
.await?;

assert_snapshot!(result, @r"
+----+-------------+-------+
| id | dept_name | head |
+----+-------------+-------+
| 1 | Engineering | Alice |
| 2 | Sales | Bob |
+----+-------------+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_with_compression() -> Result<()> {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy());

let file = std::fs::File::create(&path)?;
let mut encoder = GzEncoder::new(file, Compression::default());
encoder.write_all(
r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#.as_bytes(),
)?;
encoder.finish()?;

let ctx = SessionContext::new();
let options = NdJsonReadOptions::default()
.newline_delimited(false)
.file_compression_type(FileCompressionType::GZIP)
.file_extension(".json.gz");

ctx.register_json("test_table", &path, options).await?;
let result = ctx
.sql("SELECT a, b FROM test_table ORDER BY a")
.await?
.collect()
.await?;

assert_snapshot!(batches_to_string(&result), @r"
+---+-------+
| a | b |
+---+-------+
| 1 | hello |
| 2 | world |
+---+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_list_of_structs() -> Result<()> {
let batches = query_json_array(
r#"[{"id": 1, "items": [{"name": "x", "price": 10.5}]}, {"id": 2, "items": []}]"#,
"SELECT id, items FROM test_table ORDER BY id",
)
.await?;

assert_eq!(1, batches.len());
assert_eq!(2, batches[0].num_rows());
Ok(())
}
}
44 changes: 42 additions & 2 deletions datafusion/core/src/datasource/file_format/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,9 @@ impl<'a> AvroReadOptions<'a> {
}
}

/// Options that control the reading of Line-delimited JSON files (NDJson)
/// Options that control the reading of JSON files.
///
/// Supports both newline-delimited JSON (NDJSON) and JSON array formats.
///
/// Note this structure is supplied when a datasource is created and
/// can not not vary from statement to statement. For settings that
Expand All @@ -465,6 +467,22 @@ pub struct NdJsonReadOptions<'a> {
pub infinite: bool,
/// Indicates how the file is sorted
pub file_sort_order: Vec<Vec<SortExpr>>,
/// Whether to read as newline-delimited JSON (default: true).
///
/// When `true` (default), expects newline-delimited JSON (NDJSON):
/// ```text
/// {"key1": 1, "key2": "val"}
/// {"key1": 2, "key2": "vals"}
/// ```
///
/// When `false`, expects JSON array format:
/// ```text
/// [
/// {"key1": 1, "key2": "val"},
/// {"key1": 2, "key2": "vals"}
/// ]
/// ```
pub newline_delimited: bool,
}

impl Default for NdJsonReadOptions<'_> {
Expand All @@ -477,6 +495,7 @@ impl Default for NdJsonReadOptions<'_> {
file_compression_type: FileCompressionType::UNCOMPRESSED,
infinite: false,
file_sort_order: vec![],
newline_delimited: true,
}
}
}
Expand Down Expand Up @@ -529,6 +548,26 @@ impl<'a> NdJsonReadOptions<'a> {
self.schema_infer_max_records = schema_infer_max_records;
self
}

/// Set whether to read as newline-delimited JSON.
///
/// When `true` (default), expects newline-delimited JSON (NDJSON):
/// ```text
/// {"key1": 1, "key2": "val"}
/// {"key1": 2, "key2": "vals"}
/// ```
///
/// When `false`, expects JSON array format:
/// ```text
/// [
/// {"key1": 1, "key2": "val"},
/// {"key1": 2, "key2": "vals"}
/// ]
/// ```
pub fn newline_delimited(mut self, newline_delimited: bool) -> Self {
self.newline_delimited = newline_delimited;
self
}
}

#[async_trait]
Expand Down Expand Up @@ -663,7 +702,8 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
let file_format = JsonFormat::default()
.with_options(table_options.json)
.with_schema_infer_max_rec(self.schema_infer_max_records)
.with_file_compression_type(self.file_compression_type.to_owned());
.with_file_compression_type(self.file_compression_type.to_owned())
.with_newline_delimited(self.newline_delimited);

ListingOptions::new(Arc::new(file_format))
.with_file_extension(self.file_extension)
Expand Down
Loading