Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ strum_macros = "0.27.2"
tempfile = "3"
testcontainers-modules = { version = "0.14" }
tokio = { version = "1.48", features = ["macros", "rt", "sync"] }
tokio-stream = "0.1"
tokio-util = "0.7"
url = "2.5.7"
uuid = "1.20"
zstd = { version = "0.13", default-features = false }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async fn json_opener() -> Result<()> {
projected,
FileCompressionType::UNCOMPRESSED,
Arc::new(object_store),
true,
);

let scan_config = FileScanConfigBuilder::new(
Expand Down
16 changes: 16 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3065,6 +3065,22 @@ config_namespace! {
/// If not specified, the default level for the compression algorithm is used.
pub compression_level: Option<u32>, default = None
pub schema_infer_max_rec: Option<usize>, default = None
/// The JSON format to use when reading files.
///
/// When `true` (default), expects newline-delimited JSON (NDJSON):
/// ```text
/// {"key1": 1, "key2": "val"}
/// {"key1": 2, "key2": "vals"}
/// ```
///
/// When `false`, expects JSON array format:
/// ```text
/// [
/// {"key1": 1, "key2": "val"},
/// {"key1": 2, "key2": "vals"}
/// ]
/// ```
pub newline_delimited: bool, default = true
}
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ impl DataFrame {
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let ctx = SessionContext::new();
/// let df = ctx.read_json("tests/data/unnest.json", NdJsonReadOptions::default()).await?;
/// let df = ctx.read_json("tests/data/unnest.json", JsonReadOptions::default()).await?;
/// // expand into multiple columns if it's json array, flatten field name if it's nested structure
/// let df = df.unnest_columns(&["b","c","d"])?;
/// let expected = vec![
Expand Down
252 changes: 246 additions & 6 deletions datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ mod tests {
use super::*;

use crate::datasource::file_format::test_util::scan_format;
use crate::prelude::{NdJsonReadOptions, SessionConfig, SessionContext};
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;
use arrow::array::RecordBatch;
use arrow_schema::Schema;
Expand All @@ -46,12 +46,54 @@ mod tests {
use datafusion_common::internal_err;
use datafusion_common::stats::Precision;

use crate::execution::options::JsonReadOptions;
use datafusion_common::Result;
use datafusion_datasource::file_compression_type::FileCompressionType;
use futures::StreamExt;
use insta::assert_snapshot;
use object_store::local::LocalFileSystem;
use regex::Regex;
use rstest::rstest;
// ==================== Test Helpers ====================

/// Create a temporary JSON file and return (TempDir, path)
fn create_temp_json(content: &str) -> (tempfile::TempDir, String) {
let tmp_dir = tempfile::TempDir::new().unwrap();
let path = format!("{}/test.json", tmp_dir.path().to_string_lossy());
std::fs::write(&path, content).unwrap();
(tmp_dir, path)
}

/// Infer schema from JSON array format file
async fn infer_json_array_schema(
content: &str,
) -> Result<arrow::datatypes::SchemaRef> {
let (_tmp_dir, path) = create_temp_json(content);
let session = SessionContext::new();
let ctx = session.state();
let store = Arc::new(LocalFileSystem::new()) as _;
let format = JsonFormat::default().with_newline_delimited(false);
format
.infer_schema(&ctx, &store, &[local_unpartitioned_file(&path)])
.await
}

/// Register a JSON array table and run a query
async fn query_json_array(content: &str, query: &str) -> Result<Vec<RecordBatch>> {
let (_tmp_dir, path) = create_temp_json(content);
let ctx = SessionContext::new();
let options = JsonReadOptions::default().newline_delimited(false);
ctx.register_json("test_table", &path, options).await?;
ctx.sql(query).await?.collect().await
}

/// Register a JSON array table and run a query, return formatted string
async fn query_json_array_str(content: &str, query: &str) -> Result<String> {
let result = query_json_array(content, query).await?;
Ok(batches_to_string(&result))
}

// ==================== Existing Tests ====================

#[tokio::test]
async fn read_small_batches() -> Result<()> {
Expand Down Expand Up @@ -208,7 +250,7 @@ mod tests {
let ctx = SessionContext::new_with_config(config);

let table_path = "tests/data/1.json";
let options = NdJsonReadOptions::default();
let options = JsonReadOptions::default();

ctx.register_json("json_parallel", table_path, options)
.await?;
Expand Down Expand Up @@ -240,7 +282,7 @@ mod tests {
let ctx = SessionContext::new_with_config(config);

let table_path = "tests/data/empty.json";
let options = NdJsonReadOptions::default();
let options = JsonReadOptions::default();

ctx.register_json("json_parallel_empty", table_path, options)
.await?;
Expand Down Expand Up @@ -314,7 +356,6 @@ mod tests {
.digest(r#"{ "c1": 11, "c2": 12, "c3": 13, "c4": 14, "c5": 15 }"#.into());

let mut all_batches = RecordBatch::new_empty(schema.clone());
// We get RequiresMoreData after 2 batches because of how json::Decoder works
for _ in 0..2 {
let output = deserializer.next()?;
let DeserializerOutput::RecordBatch(batch) = output else {
Expand Down Expand Up @@ -358,7 +399,6 @@ mod tests {
let df = ctx.sql("SELECT CAST(1 AS BIGINT) AS id LIMIT 0").await?;
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist and be empty
assert!(std::path::Path::new(&path).exists());
let metadata = std::fs::metadata(&path)?;
assert_eq!(metadata.len(), 0);
Expand All @@ -385,10 +425,210 @@ mod tests {
let df = ctx.read_batch(empty_batch.clone())?;
df.write_json(&path, crate::dataframe::DataFrameWriteOptions::new(), None)
.await?;
// Expected the file to exist and be empty
assert!(std::path::Path::new(&path).exists());
let metadata = std::fs::metadata(&path)?;
assert_eq!(metadata.len(), 0);
Ok(())
}

// ==================== JSON Array Format Tests ====================

#[tokio::test]
async fn test_json_array_schema_inference() -> Result<()> {
let schema = infer_json_array_schema(
r#"[{"a": 1, "b": 2.0, "c": true}, {"a": 2, "b": 3.5, "c": false}]"#,
)
.await?;

let fields: Vec<_> = schema
.fields()
.iter()
.map(|f| format!("{}: {:?}", f.name(), f.data_type()))
.collect();
assert_eq!(vec!["a: Int64", "b: Float64", "c: Boolean"], fields);
Ok(())
}

#[tokio::test]
async fn test_json_array_empty() -> Result<()> {
let schema = infer_json_array_schema("[]").await?;
assert_eq!(schema.fields().len(), 0);
Ok(())
}

#[tokio::test]
async fn test_json_array_nested_struct() -> Result<()> {
let schema = infer_json_array_schema(
r#"[{"id": 1, "info": {"name": "Alice", "age": 30}}]"#,
)
.await?;

let info_field = schema.field_with_name("info").unwrap();
assert!(matches!(info_field.data_type(), DataType::Struct(_)));
Ok(())
}

#[tokio::test]
async fn test_json_array_list_type() -> Result<()> {
let schema =
infer_json_array_schema(r#"[{"id": 1, "tags": ["a", "b", "c"]}]"#).await?;

let tags_field = schema.field_with_name("tags").unwrap();
assert!(matches!(tags_field.data_type(), DataType::List(_)));
Ok(())
}

#[tokio::test]
async fn test_json_array_basic_query() -> Result<()> {
let result = query_json_array_str(
r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}, {"a": 3, "b": "test"}]"#,
"SELECT a, b FROM test_table ORDER BY a",
)
.await?;

assert_snapshot!(result, @r"
+---+-------+
| a | b |
+---+-------+
| 1 | hello |
| 2 | world |
| 3 | test |
+---+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_with_nulls() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "name": "Alice"}, {"id": 2, "name": null}, {"id": 3, "name": "Charlie"}]"#,
"SELECT id, name FROM test_table ORDER BY id",
)
.await?;

assert_snapshot!(result, @r"
+----+---------+
| id | name |
+----+---------+
| 1 | Alice |
| 2 | |
| 3 | Charlie |
+----+---------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_unnest() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "values": [10, 20, 30]}, {"id": 2, "values": [40, 50]}]"#,
"SELECT id, unnest(values) as value FROM test_table ORDER BY id, value",
)
.await?;

assert_snapshot!(result, @r"
+----+-------+
| id | value |
+----+-------+
| 1 | 10 |
| 1 | 20 |
| 1 | 30 |
| 2 | 40 |
| 2 | 50 |
+----+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_unnest_struct() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "orders": [{"product": "A", "qty": 2}, {"product": "B", "qty": 3}]}, {"id": 2, "orders": [{"product": "C", "qty": 1}]}]"#,
"SELECT id, unnest(orders)['product'] as product, unnest(orders)['qty'] as qty FROM test_table ORDER BY id, product",
)
.await?;

assert_snapshot!(result, @r"
+----+---------+-----+
| id | product | qty |
+----+---------+-----+
| 1 | A | 2 |
| 1 | B | 3 |
| 2 | C | 1 |
+----+---------+-----+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_nested_struct_access() -> Result<()> {
let result = query_json_array_str(
r#"[{"id": 1, "dept": {"name": "Engineering", "head": "Alice"}}, {"id": 2, "dept": {"name": "Sales", "head": "Bob"}}]"#,
"SELECT id, dept['name'] as dept_name, dept['head'] as head FROM test_table ORDER BY id",
)
.await?;

assert_snapshot!(result, @r"
+----+-------------+-------+
| id | dept_name | head |
+----+-------------+-------+
| 1 | Engineering | Alice |
| 2 | Sales | Bob |
+----+-------------+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_with_compression() -> Result<()> {
use flate2::Compression;
use flate2::write::GzEncoder;
use std::io::Write;

let tmp_dir = tempfile::TempDir::new()?;
let path = format!("{}/array.json.gz", tmp_dir.path().to_string_lossy());

let file = std::fs::File::create(&path)?;
let mut encoder = GzEncoder::new(file, Compression::default());
encoder.write_all(
r#"[{"a": 1, "b": "hello"}, {"a": 2, "b": "world"}]"#.as_bytes(),
)?;
encoder.finish()?;

let ctx = SessionContext::new();
let options = JsonReadOptions::default()
.newline_delimited(false)
.file_compression_type(FileCompressionType::GZIP)
.file_extension(".json.gz");

ctx.register_json("test_table", &path, options).await?;
let result = ctx
.sql("SELECT a, b FROM test_table ORDER BY a")
.await?
.collect()
.await?;

assert_snapshot!(batches_to_string(&result), @r"
+---+-------+
| a | b |
+---+-------+
| 1 | hello |
| 2 | world |
+---+-------+
");
Ok(())
}

#[tokio::test]
async fn test_json_array_list_of_structs() -> Result<()> {
let batches = query_json_array(
r#"[{"id": 1, "items": [{"name": "x", "price": 10.5}]}, {"id": 2, "items": []}]"#,
"SELECT id, items FROM test_table ORDER BY id",
)
.await?;

assert_eq!(1, batches.len());
assert_eq!(2, batches[0].num_rows());
Ok(())
}
}
Loading