From 4a3a28aaf5370f32c9eeac796bc58648870cfa8b Mon Sep 17 00:00:00 2001 From: comphead Date: Thu, 14 Aug 2025 16:57:39 -0700 Subject: [PATCH 01/11] feat: Array Literals nested support --- native/core/src/execution/planner.rs | 389 +++++++++++++++++++-------- 1 file changed, 272 insertions(+), 117 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index e9f5885bf2..8472587a37 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -28,7 +28,7 @@ use crate::{ }, }; use arrow::compute::CastOptions; -use arrow::datatypes::{DataType, Field, Schema, TimeUnit, DECIMAL128_MAX_PRECISION}; +use arrow::datatypes::{DataType, Field, FieldRef, Schema, TimeUnit, DECIMAL128_MAX_PRECISION}; use datafusion::functions_aggregate::bit_and_or_xor::{bit_and_udaf, bit_or_udaf, bit_xor_udaf}; use datafusion::functions_aggregate::count::count_udaf; use datafusion::functions_aggregate::min_max::max_udaf; @@ -87,15 +87,16 @@ use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; use arrow::array::{ - BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, - Int16Array, Int32Array, Int64Array, Int8Array, NullArray, StringBuilder, - TimestampMicrosecondArray, + Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, + Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, NullArray, + StringBuilder, TimestampMicrosecondArray, }; -use arrow::buffer::BooleanBuffer; +use arrow::buffer::{BooleanBuffer, OffsetBuffer}; use datafusion::common::utils::SingleRowListArrayBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::GlobalLimitExec; +use datafusion_comet_proto::spark_expression::ListLiteral; use datafusion_comet_proto::spark_operator::SparkFilePartition; use datafusion_comet_proto::{ spark_expression::{ @@ -485,118 +486,10 @@ impl PhysicalPlanner { } }, Value::ListVal(values) => { - if let DataType::List(f) = data_type { - match f.data_type() { - DataType::Null => { - SingleRowListArrayBuilder::new(Arc::new(NullArray::new(values.clone().null_mask.len()))) - .build_list_scalar() - } - DataType::Boolean => { - let vals = values.clone(); - SingleRowListArrayBuilder::new(Arc::new(BooleanArray::new(BooleanBuffer::from(vals.boolean_values), Some(vals.null_mask.into())))) - .build_list_scalar() - } - DataType::Int8 => { - let vals = values.clone(); - SingleRowListArrayBuilder::new(Arc::new(Int8Array::new(vals.byte_values.iter().map(|&x| x as i8).collect::>().into(), Some(vals.null_mask.into())))) - .build_list_scalar() - } - DataType::Int16 => { - let vals = values.clone(); - SingleRowListArrayBuilder::new(Arc::new(Int16Array::new(vals.short_values.iter().map(|&x| x as i16).collect::>().into(), Some(vals.null_mask.into())))) - .build_list_scalar() - } - DataType::Int32 => { - let vals = values.clone(); - SingleRowListArrayBuilder::new(Arc::new(Int32Array::new(vals.int_values.into(), Some(vals.null_mask.into())))) - .build_list_scalar() - } - DataType::Int64 => { - let vals = values.clone(); - SingleRowListArrayBuilder::new(Arc::new(Int64Array::new(vals.long_values.into(), Some(vals.null_mask.into())))) - .build_list_scalar() - } - DataType::Float32 => { - let vals = values.clone(); - SingleRowListArrayBuilder::new(Arc::new(Float32Array::new(vals.float_values.into(), Some(vals.null_mask.into())))) - .build_list_scalar() - } - DataType::Float64 => { - let vals = values.clone(); - SingleRowListArrayBuilder::new(Arc::new(Float64Array::new(vals.double_values.into(), Some(vals.null_mask.into())))) - .build_list_scalar() - } - DataType::Timestamp(TimeUnit::Microsecond, None) => { - let vals = values.clone(); - SingleRowListArrayBuilder::new(Arc::new(TimestampMicrosecondArray::new(vals.long_values.into(), Some(vals.null_mask.into())))) - .build_list_scalar() - } - DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => { - let vals = values.clone(); - SingleRowListArrayBuilder::new(Arc::new(TimestampMicrosecondArray::new(vals.long_values.into(), Some(vals.null_mask.into())).with_timezone(Arc::clone(tz)))) - .build_list_scalar() - } - DataType::Date32 => { - let vals = values.clone(); - SingleRowListArrayBuilder::new(Arc::new(Date32Array::new(vals.int_values.into(), Some(vals.null_mask.into())))) - .build_list_scalar() - } - DataType::Binary => { - // Using a builder as it is cumbersome to create BinaryArray from a vector with nulls - // and calculate correct offsets - let vals = values.clone(); - let item_capacity = vals.string_values.len(); - let data_capacity = vals.string_values.first().map(|s| s.len() * item_capacity).unwrap_or(0); - let mut arr = BinaryBuilder::with_capacity(item_capacity, data_capacity); - - for (i, v) in vals.bytes_values.into_iter().enumerate() { - if vals.null_mask[i] { - arr.append_value(v); - } else { - arr.append_null(); - } - } - - SingleRowListArrayBuilder::new(Arc::new(arr.finish())) - .build_list_scalar() - } - DataType::Utf8 => { - // Using a builder as it is cumbersome to create StringArray from a vector with nulls - // and calculate correct offsets - let vals = values.clone(); - let item_capacity = vals.string_values.len(); - let data_capacity = vals.string_values.first().map(|s| s.len() * item_capacity).unwrap_or(0); - let mut arr = StringBuilder::with_capacity(item_capacity, data_capacity); - - for (i, v) in vals.string_values.into_iter().enumerate() { - if vals.null_mask[i] { - arr.append_value(v); - } else { - arr.append_null(); - } - } - - SingleRowListArrayBuilder::new(Arc::new(arr.finish())) - .build_list_scalar() - } - DataType::Decimal128(p, s) => { - let vals = values.clone(); - SingleRowListArrayBuilder::new(Arc::new(Decimal128Array::new(vals.decimal_values.into_iter().map(|v| { - let big_integer = BigInt::from_signed_bytes_be(&v); - big_integer.to_i128().ok_or_else(|| { - GeneralError(format!( - "Cannot parse {big_integer:?} as i128 for Decimal literal" - )) - }).unwrap() - }).collect::>().into(), Some(vals.null_mask.into())).with_precision_and_scale(*p, *s)?)).build_list_scalar() - } - dt => { - return Err(GeneralError(format!( - "DataType::List literal does not support {dt:?} type" - ))) - } - } + dbg!(&values); + if let DataType::List(f) = data_type { + SingleRowListArrayBuilder::new(literal_to_array_ref(f.data_type().clone(), values.clone())?).build_list_scalar() } else { return Err(GeneralError(format!( "Expected DataType::List but got {data_type:?}" @@ -2802,12 +2695,186 @@ fn create_case_expr( } } +pub(crate) fn build_list_from_literal(data: ListLiteral) -> ArrayRef { + // --- Base case: no nested children → leaf node --- + if data.list_values.is_empty() { + // Create an Int32Array from the leaf values + Arc::new(Int32Array::from(data.int_values.clone())) as ArrayRef + } else { + // --- Recursive case: has nested children --- + // Build a ListArray for each child recursively + let child_arrays: Vec = data + .list_values + .iter() + .map(|c| build_list_from_literal(c.clone())) + .collect(); + + // Convert Vec into Vec<&dyn Array> for concat() + let child_refs: Vec<&dyn arrow::array::Array> = + child_arrays.iter().map(|a| a.as_ref()).collect(); + + // Concatenate all child arrays' *values* into one array + // Example: [[1,2,3], [4,5,6]] → values = [1,2,3,4,5,6] + let concat = arrow::compute::concat(&child_refs).unwrap(); + + // --- Build offsets for the parent list --- + let mut offsets = Vec::with_capacity(child_arrays.len() + 1); + offsets.push(0); // first list always starts at 0 + let mut sum = 0; + for arr in &child_arrays { + sum += arr.len() as i32; // each child's length adds to total + offsets.push(sum); // store cumulative sum as next offset + } + + // Create and return the parent ListArray + Arc::new(ListArray::new( + // Field: item type matches the concatenated child's type + FieldRef::from(Field::new("item", concat.data_type().clone(), true)), + OffsetBuffer::new(offsets.into()), // where each sublist starts/ends + concat, // the flattened values array + None, // no null bitmap at this level + )) + } +} +fn literal_to_array_ref( + data_type: DataType, + list_literal: ListLiteral, +) -> Result { + let nulls = &list_literal.null_mask; + match data_type { + DataType::Null => Ok(Arc::new(NullArray::new(nulls.len()))), + DataType::Boolean => Ok(Arc::new(BooleanArray::new( + BooleanBuffer::from(list_literal.boolean_values), + Some(nulls.clone().into()), + ))), + DataType::Int8 => Ok(Arc::new(Int8Array::new( + list_literal + .byte_values + .iter() + .map(|&x| x as i8) + .collect::>() + .into(), + Some(nulls.clone().into()), + ))), + DataType::Int16 => Ok(Arc::new(Int16Array::new( + list_literal + .short_values + .iter() + .map(|&x| x as i16) + .collect::>() + .into(), + Some(nulls.clone().into()), + ))), + DataType::Int32 => Ok(Arc::new(Int32Array::new( + list_literal.int_values.into(), + Some(nulls.clone().into()), + ))), + DataType::Int64 => Ok(Arc::new(Int64Array::new( + list_literal.long_values.into(), + Some(nulls.clone().into()), + ))), + DataType::Float32 => Ok(Arc::new(Float32Array::new( + list_literal.float_values.into(), + Some(nulls.clone().into()), + ))), + DataType::Float64 => Ok(Arc::new(Float64Array::new( + list_literal.double_values.into(), + Some(nulls.clone().into()), + ))), + DataType::Date32 => Ok(Arc::new(Date32Array::new( + list_literal.int_values.into(), + Some(nulls.clone().into()), + ))), + DataType::Timestamp(TimeUnit::Microsecond, None) => { + Ok(Arc::new(TimestampMicrosecondArray::new( + list_literal.long_values.into(), + Some(nulls.clone().into()), + ))) + } + DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) => Ok(Arc::new( + TimestampMicrosecondArray::new( + list_literal.long_values.into(), + Some(nulls.clone().into()), + ) + .with_timezone(Arc::clone(&tz)), + )), + DataType::Binary => { + // Using a builder as it is cumbersome to create BinaryArray from a vector with nulls + // and calculate correct offsets + let item_capacity = list_literal.bytes_values.len(); + let data_capacity = list_literal + .bytes_values + .first() + .map(|s| s.len() * item_capacity) + .unwrap_or(0); + let mut arr = BinaryBuilder::with_capacity(item_capacity, data_capacity); + + for (i, v) in list_literal.bytes_values.into_iter().enumerate() { + if nulls[i] { + arr.append_value(v); + } else { + arr.append_null(); + } + } + + Ok(Arc::new(arr.finish())) + } + DataType::Utf8 => { + // Using a builder as it is cumbersome to create StringArray from a vector with nulls + // and calculate correct offsets + let item_capacity = list_literal.string_values.len(); + let data_capacity = list_literal + .string_values + .first() + .map(|s| s.len() * item_capacity) + .unwrap_or(0); + let mut arr = StringBuilder::with_capacity(item_capacity, data_capacity); + + for (i, v) in list_literal.string_values.into_iter().enumerate() { + if nulls[i] { + arr.append_value(v); + } else { + arr.append_null(); + } + } + + Ok(Arc::new(arr.finish())) + } + DataType::Decimal128(p, s) => Ok(Arc::new( + Decimal128Array::new( + list_literal + .decimal_values + .into_iter() + .map(|v| { + let big_integer = BigInt::from_signed_bytes_be(&v); + big_integer + .to_i128() + .ok_or_else(|| { + GeneralError(format!( + "Cannot parse {big_integer:?} as i128 for Decimal literal" + )) + }) + .unwrap() + }) + .collect::>() + .into(), + Some(nulls.clone().into()), + ) + .with_precision_and_scale(p, s)?, + )), + DataType::List(f) => Ok(Arc::new(build_list_from_literal(list_literal))), + dt => Err(GeneralError(format!( + "DataType::List literal does not support {dt:?} type" + ))), + } +} + #[cfg(test)] mod tests { use futures::{poll, StreamExt}; use std::{sync::Arc, task::Poll}; - use arrow::array::{Array, DictionaryArray, Int32Array, RecordBatch, StringArray}; + use arrow::array::{Array, DictionaryArray, Int32Array, ListArray, RecordBatch, StringArray}; use arrow::datatypes::{DataType, Field, Fields, Schema}; use datafusion::catalog::memory::DataSourceExec; use datafusion::datasource::listing::PartitionedFile; @@ -2825,9 +2892,11 @@ mod tests { use crate::execution::{operators::InputBatch, planner::PhysicalPlanner}; use crate::execution::operators::ExecutionError; + use crate::execution::planner::build_list_from_literal; use crate::parquet::parquet_support::SparkParquetOptions; use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; use datafusion_comet_proto::spark_expression::expr::ExprStruct; + use datafusion_comet_proto::spark_expression::ListLiteral; use datafusion_comet_proto::{ spark_expression::expr::ExprStruct::*, spark_expression::Expr, @@ -3610,4 +3679,90 @@ mod tests { assert_batches_eq!(expected, &[actual]); Ok(()) } + + #[tokio::test] + async fn test_literal_to_list() -> Result<(), DataFusionError> { + // [[[1, 2, 3], [4, 5, 6], [7, 8, 9, null], null], [10, null, 12], null] + let data = ListLiteral { + list_values: vec![ + ListLiteral { + list_values: vec![ + ListLiteral { + int_values: vec![1, 2, 3], + null_mask: vec![true, true, true], + ..Default::default() + }, + ListLiteral { + int_values: vec![4, 5, 6], + null_mask: vec![true, true, true], + ..Default::default() + }, + ListLiteral { + int_values: vec![7, 8, 9, 0], + null_mask: vec![true, true, true, false], + ..Default::default() + }, + ListLiteral { + ..Default::default() + }, + ], + null_mask: vec![true, true, true, false], + ..Default::default() + }, + ListLiteral { + list_values: vec![ListLiteral { + int_values: vec![10, 0, 11], + null_mask: vec![true, false, true], + ..Default::default() + }], + null_mask: vec![true], + ..Default::default() + }, + ListLiteral { + ..Default::default() + }, + ], + null_mask: vec![true, true, false], + ..Default::default() + }; + + let array = build_list_from_literal(data); + + // Top-level should be ListArray> + let list_outer = array.as_any().downcast_ref::().unwrap(); + assert_eq!(list_outer.len(), 2); + + // First outer element: ListArray + let first_elem = list_outer.value(0); + dbg!(&first_elem); + let list_inner = first_elem.as_any().downcast_ref::().unwrap(); + assert_eq!(list_inner.len(), 3); + + // Inner values + let v0 = list_inner.value(0); + dbg!(&v0); + let vals0 = v0.as_any().downcast_ref::().unwrap(); + assert_eq!(vals0.values(), &[1, 2, 3]); + + let v1 = list_inner.value(1); + let vals1 = v1.as_any().downcast_ref::().unwrap(); + assert_eq!(vals1.values(), &[4, 5, 6]); + + let v2 = list_inner.value(2); + let vals2 = v2.as_any().downcast_ref::().unwrap(); + assert_eq!(vals2.values(), &[7, 8, 9]); + + // Second outer element + let second_elem = list_outer.value(1); + let list_inner2 = second_elem.as_any().downcast_ref::().unwrap(); + assert_eq!(list_inner2.len(), 1); + + let v3 = list_inner2.value(0); + let vals3 = v3.as_any().downcast_ref::().unwrap(); + assert_eq!(vals3.values(), &[10, 11]); + + //println!("result 2 {:?}", build_array(&data)); + + Ok(()) + } } From 14d03830be21a2abc205d4de08edbe78ce3cc6ad Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 15 Aug 2025 16:37:06 -0700 Subject: [PATCH 02/11] impl --- native/core/src/execution/planner.rs | 124 +++++++++++++++------------ 1 file changed, 71 insertions(+), 53 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 8472587a37..8ee1dc7f97 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2695,52 +2695,13 @@ fn create_case_expr( } } -pub(crate) fn build_list_from_literal(data: ListLiteral) -> ArrayRef { - // --- Base case: no nested children → leaf node --- - if data.list_values.is_empty() { - // Create an Int32Array from the leaf values - Arc::new(Int32Array::from(data.int_values.clone())) as ArrayRef - } else { - // --- Recursive case: has nested children --- - // Build a ListArray for each child recursively - let child_arrays: Vec = data - .list_values - .iter() - .map(|c| build_list_from_literal(c.clone())) - .collect(); - - // Convert Vec into Vec<&dyn Array> for concat() - let child_refs: Vec<&dyn arrow::array::Array> = - child_arrays.iter().map(|a| a.as_ref()).collect(); - - // Concatenate all child arrays' *values* into one array - // Example: [[1,2,3], [4,5,6]] → values = [1,2,3,4,5,6] - let concat = arrow::compute::concat(&child_refs).unwrap(); - - // --- Build offsets for the parent list --- - let mut offsets = Vec::with_capacity(child_arrays.len() + 1); - offsets.push(0); // first list always starts at 0 - let mut sum = 0; - for arr in &child_arrays { - sum += arr.len() as i32; // each child's length adds to total - offsets.push(sum); // store cumulative sum as next offset - } - - // Create and return the parent ListArray - Arc::new(ListArray::new( - // Field: item type matches the concatenated child's type - FieldRef::from(Field::new("item", concat.data_type().clone(), true)), - OffsetBuffer::new(offsets.into()), // where each sublist starts/ends - concat, // the flattened values array - None, // no null bitmap at this level - )) - } -} fn literal_to_array_ref( data_type: DataType, list_literal: ListLiteral, ) -> Result { let nulls = &list_literal.null_mask; + dbg!(&data_type); + dbg!(&list_literal); match data_type { DataType::Null => Ok(Arc::new(NullArray::new(nulls.len()))), DataType::Boolean => Ok(Arc::new(BooleanArray::new( @@ -2862,7 +2823,44 @@ fn literal_to_array_ref( ) .with_precision_and_scale(p, s)?, )), - DataType::List(f) => Ok(Arc::new(build_list_from_literal(list_literal))), + // list of primitive types + DataType::List(f) if !matches!(f.data_type(), DataType::List(_)) => { + literal_to_array_ref(f.data_type().clone(), list_literal) + } + DataType::List(f) => { + let dt = f.data_type().clone(); + let child_arrays: Vec = list_literal + .list_values + .iter() + .map(|c| literal_to_array_ref(dt.clone(), c.clone()).unwrap()) + .collect(); + + // Convert Vec into Vec<&dyn Array> for concat() + let child_refs: Vec<&dyn arrow::array::Array> = + child_arrays.iter().map(|a| a.as_ref()).collect(); + + // Concatenate all child arrays' *values* into one array + // Example: [[1,2,3], [4,5,6]] → values = [1,2,3,4,5,6] + let concat = arrow::compute::concat(&child_refs).unwrap(); + + // --- Build offsets for the parent list --- + let mut offsets = Vec::with_capacity(child_arrays.len() + 1); + offsets.push(0); // first list always starts at 0 + let mut sum = 0; + for arr in &child_arrays { + sum += arr.len() as i32; // each child's length adds to total + offsets.push(sum); // store cumulative sum as next offset + } + + // Create and return the parent ListArray + Ok(Arc::new(ListArray::new( + // Field: item type matches the concatenated child's type + FieldRef::from(Field::new("item", concat.data_type().clone(), true)), + OffsetBuffer::new(offsets.into()), // where each sublist starts/ends + concat, // the flattened values array + None, // no null bitmap at this level + ))) + } dt => Err(GeneralError(format!( "DataType::List literal does not support {dt:?} type" ))), @@ -2875,7 +2873,7 @@ mod tests { use std::{sync::Arc, task::Poll}; use arrow::array::{Array, DictionaryArray, Int32Array, ListArray, RecordBatch, StringArray}; - use arrow::datatypes::{DataType, Field, Fields, Schema}; + use arrow::datatypes::{DataType, Field, FieldRef, Fields, Schema}; use datafusion::catalog::memory::DataSourceExec; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::object_store::ObjectStoreUrl; @@ -2892,7 +2890,7 @@ mod tests { use crate::execution::{operators::InputBatch, planner::PhysicalPlanner}; use crate::execution::operators::ExecutionError; - use crate::execution::planner::build_list_from_literal; + use crate::execution::planner::literal_to_array_ref; use crate::parquet::parquet_support::SparkParquetOptions; use crate::parquet::schema_adapter::SparkSchemaAdapterFactory; use datafusion_comet_proto::spark_expression::expr::ExprStruct; @@ -3702,11 +3700,11 @@ mod tests { null_mask: vec![true, true, true, false], ..Default::default() }, - ListLiteral { - ..Default::default() - }, + // ListLiteral { + // ..Default::default() + // }, ], - null_mask: vec![true, true, true, false], + null_mask: vec![true, true, true], ..Default::default() }, ListLiteral { @@ -3718,15 +3716,35 @@ mod tests { null_mask: vec![true], ..Default::default() }, - ListLiteral { - ..Default::default() - }, + // ListLiteral { + // ..Default::default() + // }, ], - null_mask: vec![true, true, false], + null_mask: vec![true, true], ..Default::default() }; - let array = build_list_from_literal(data); + let nested_type = DataType::List(FieldRef::from(Field::new( + "item", + DataType::List( + Field::new( + "item", + DataType::List( + Field::new( + "item", + DataType::Int32, + true, // Int32 nullable + ) + .into(), + ), + true, // inner list nullable + ) + .into(), + ), + true, // outer list nullable + ))); + + let array = literal_to_array_ref(nested_type, data)?; // Top-level should be ListArray> let list_outer = array.as_any().downcast_ref::().unwrap(); From b10fd5177d4e60072b80285ddacc05143cf16525 Mon Sep 17 00:00:00 2001 From: comphead Date: Fri, 15 Aug 2025 16:43:07 -0700 Subject: [PATCH 03/11] impl --- native/core/src/execution/planner.rs | 37 +++++++++++++++++++--------- 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 8ee1dc7f97..a7a4a4e110 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -3680,7 +3680,20 @@ mod tests { #[tokio::test] async fn test_literal_to_list() -> Result<(), DataFusionError> { - // [[[1, 2, 3], [4, 5, 6], [7, 8, 9, null], null], [10, null, 12], null] + /* + [ + [ + [1, 2, 3], + [4, 5, 6], + [7, 8, 9, null], + null + ], + [ + [10, null, 12] + ], + null + ] + */ let data = ListLiteral { list_values: vec![ ListLiteral { @@ -3700,11 +3713,11 @@ mod tests { null_mask: vec![true, true, true, false], ..Default::default() }, - // ListLiteral { - // ..Default::default() - // }, + ListLiteral { + ..Default::default() + }, ], - null_mask: vec![true, true, true], + null_mask: vec![true, true, true, false], ..Default::default() }, ListLiteral { @@ -3716,11 +3729,11 @@ mod tests { null_mask: vec![true], ..Default::default() }, - // ListLiteral { - // ..Default::default() - // }, + ListLiteral { + ..Default::default() + }, ], - null_mask: vec![true, true], + null_mask: vec![true, true, false], ..Default::default() }; @@ -3754,7 +3767,7 @@ mod tests { let first_elem = list_outer.value(0); dbg!(&first_elem); let list_inner = first_elem.as_any().downcast_ref::().unwrap(); - assert_eq!(list_inner.len(), 3); + assert_eq!(list_inner.len(), 4); // Inner values let v0 = list_inner.value(0); @@ -3768,7 +3781,7 @@ mod tests { let v2 = list_inner.value(2); let vals2 = v2.as_any().downcast_ref::().unwrap(); - assert_eq!(vals2.values(), &[7, 8, 9]); + assert_eq!(vals2.values(), &[7, 8, 9, 0]); // Second outer element let second_elem = list_outer.value(1); @@ -3777,7 +3790,7 @@ mod tests { let v3 = list_inner2.value(0); let vals3 = v3.as_any().downcast_ref::().unwrap(); - assert_eq!(vals3.values(), &[10, 11]); + assert_eq!(vals3.values(), &[10, 0, 11]); //println!("result 2 {:?}", build_array(&data)); From 9f45cee09273a4d7af0185311e72c5058c9adaba Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 16 Aug 2025 10:40:16 -0700 Subject: [PATCH 04/11] impl --- native/core/src/execution/planner.rs | 72 +++++++++++++++++++--------- 1 file changed, 50 insertions(+), 22 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index a7a4a4e110..f0e086986b 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -91,7 +91,7 @@ use arrow::array::{ Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, NullArray, StringBuilder, TimestampMicrosecondArray, }; -use arrow::buffer::{BooleanBuffer, OffsetBuffer}; +use arrow::buffer::{BooleanBuffer, NullBuffer, OffsetBuffer}; use datafusion::common::utils::SingleRowListArrayBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; use datafusion::physical_plan::filter::FilterExec; @@ -486,10 +486,8 @@ impl PhysicalPlanner { } }, Value::ListVal(values) => { - dbg!(&values); - - if let DataType::List(f) = data_type { - SingleRowListArrayBuilder::new(literal_to_array_ref(f.data_type().clone(), values.clone())?).build_list_scalar() + if let DataType::List(ref f) = data_type { + SingleRowListArrayBuilder::new(literal_to_array_ref(data_type, values.clone())?).build_list_scalar() } else { return Err(GeneralError(format!( "Expected DataType::List but got {data_type:?}" @@ -2836,29 +2834,29 @@ fn literal_to_array_ref( .collect(); // Convert Vec into Vec<&dyn Array> for concat() - let child_refs: Vec<&dyn arrow::array::Array> = + let child_refs: Vec<&dyn Array> = child_arrays.iter().map(|a| a.as_ref()).collect(); - // Concatenate all child arrays' *values* into one array - // Example: [[1,2,3], [4,5,6]] → values = [1,2,3,4,5,6] - let concat = arrow::compute::concat(&child_refs).unwrap(); - // --- Build offsets for the parent list --- let mut offsets = Vec::with_capacity(child_arrays.len() + 1); - offsets.push(0); // first list always starts at 0 + offsets.push(0); // the first list always starts at 0 let mut sum = 0; for arr in &child_arrays { - sum += arr.len() as i32; // each child's length adds to total - offsets.push(sum); // store cumulative sum as next offset + sum += arr.len() as i32; // each child's length adds to the total + offsets.push(sum); // store cumulative sum as the next offset } + // Concatenate all child arrays' *values* into one array + // Example: [[1,2,3], [4,5,6]] → values = [1,2,3,4,5,6] + let concat = arrow::compute::concat(&child_refs)?; + // Create and return the parent ListArray Ok(Arc::new(ListArray::new( // Field: item type matches the concatenated child's type FieldRef::from(Field::new("item", concat.data_type().clone(), true)), - OffsetBuffer::new(offsets.into()), // where each sublist starts/ends - concat, // the flattened values array - None, // no null bitmap at this level + OffsetBuffer::new(offsets.into()), + concat, + Some(NullBuffer::from(list_literal.null_mask)), ))) } dt => Err(GeneralError(format!( @@ -3716,8 +3714,11 @@ mod tests { ListLiteral { ..Default::default() }, + ListLiteral { + ..Default::default() + }, ], - null_mask: vec![true, true, true, false], + null_mask: vec![true, true, true, false, true], ..Default::default() }, ListLiteral { @@ -3729,11 +3730,11 @@ mod tests { null_mask: vec![true], ..Default::default() }, - ListLiteral { - ..Default::default() - }, + // ListLiteral { + // ..Default::default() + // }, ], - null_mask: vec![true, true, false], + null_mask: vec![true, true], ..Default::default() }; @@ -3757,8 +3758,35 @@ mod tests { true, // outer list nullable ))); + let data = ListLiteral { + list_values: vec![ + ListLiteral { + int_values: vec![1, 2], + null_mask: vec![true, true], + ..Default::default() + }, + ListLiteral { + ..Default::default() + } + ], + null_mask: vec![true, false], + ..Default::default() + }; + + let nested_type = DataType::List(FieldRef::from(Field::new( + "item", + DataType::List( + Field::new("item", DataType::Int32, true).into(), + ), true))); + let array = literal_to_array_ref(nested_type, data)?; + dbg!(&array); + dbg!(&array.nulls()); + + + + // Top-level should be ListArray> let list_outer = array.as_any().downcast_ref::().unwrap(); assert_eq!(list_outer.len(), 2); @@ -3767,7 +3795,7 @@ mod tests { let first_elem = list_outer.value(0); dbg!(&first_elem); let list_inner = first_elem.as_any().downcast_ref::().unwrap(); - assert_eq!(list_inner.len(), 4); + assert_eq!(list_inner.len(), 5); // Inner values let v0 = list_inner.value(0); From 26b0b95d9c38dcd7cc3b040767ee8d4e12ddd051 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 18 Aug 2025 12:04:25 -0700 Subject: [PATCH 05/11] feat: Array Literals nested support --- native/core/src/execution/planner.rs | 61 +++++++++++++++------------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index f0e086986b..0f11d1f892 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -86,11 +86,7 @@ use datafusion::physical_expr::window::WindowExpr; use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; -use arrow::array::{ - Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, - Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, NullArray, - StringBuilder, TimestampMicrosecondArray, -}; +use arrow::array::{new_empty_array, Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, NullArray, StringBuilder, TimestampMicrosecondArray}; use arrow::buffer::{BooleanBuffer, NullBuffer, OffsetBuffer}; use datafusion::common::utils::SingleRowListArrayBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; @@ -2825,7 +2821,7 @@ fn literal_to_array_ref( DataType::List(f) if !matches!(f.data_type(), DataType::List(_)) => { literal_to_array_ref(f.data_type().clone(), list_literal) } - DataType::List(f) => { + DataType::List(ref f) => { let dt = f.data_type().clone(); let child_arrays: Vec = list_literal .list_values @@ -2848,14 +2844,21 @@ fn literal_to_array_ref( // Concatenate all child arrays' *values* into one array // Example: [[1,2,3], [4,5,6]] → values = [1,2,3,4,5,6] - let concat = arrow::compute::concat(&child_refs)?; + let output_array = if !child_refs.is_empty() { + arrow::compute::concat(&child_refs)? + } else { + let x = new_empty_array(&dt.clone()); + offsets = vec![x.offset() as i32]; + dbg!(&offsets); + x + }; // Create and return the parent ListArray Ok(Arc::new(ListArray::new( // Field: item type matches the concatenated child's type - FieldRef::from(Field::new("item", concat.data_type().clone(), true)), + FieldRef::from(Field::new("item", output_array.data_type().clone(), true)), OffsetBuffer::new(offsets.into()), - concat, + output_array, Some(NullBuffer::from(list_literal.null_mask)), ))) } @@ -3758,26 +3761,26 @@ mod tests { true, // outer list nullable ))); - let data = ListLiteral { - list_values: vec![ - ListLiteral { - int_values: vec![1, 2], - null_mask: vec![true, true], - ..Default::default() - }, - ListLiteral { - ..Default::default() - } - ], - null_mask: vec![true, false], - ..Default::default() - }; - - let nested_type = DataType::List(FieldRef::from(Field::new( - "item", - DataType::List( - Field::new("item", DataType::Int32, true).into(), - ), true))); + // let data = ListLiteral { + // list_values: vec![ + // ListLiteral { + // int_values: vec![1, 2], + // null_mask: vec![true, true], + // ..Default::default() + // }, + // ListLiteral { + // ..Default::default() + // } + // ], + // null_mask: vec![true, false], + // ..Default::default() + // }; + // + // let nested_type = DataType::List(FieldRef::from(Field::new( + // "item", + // DataType::List( + // Field::new("item", DataType::Int32, true).into(), + // ), true))); let array = literal_to_array_ref(nested_type, data)?; From c0199c7fdecc9052872c3692f0d0cf263faf8375 Mon Sep 17 00:00:00 2001 From: comphead Date: Sun, 31 Aug 2025 16:48:14 -0700 Subject: [PATCH 06/11] null fix --- native/core/src/execution/planner.rs | 90 +++++++++++----------------- 1 file changed, 34 insertions(+), 56 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 0f11d1f892..fcc6c5589b 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2823,43 +2823,40 @@ fn literal_to_array_ref( } DataType::List(ref f) => { let dt = f.data_type().clone(); - let child_arrays: Vec = list_literal - .list_values - .iter() - .map(|c| literal_to_array_ref(dt.clone(), c.clone()).unwrap()) - .collect(); - - // Convert Vec into Vec<&dyn Array> for concat() - let child_refs: Vec<&dyn Array> = - child_arrays.iter().map(|a| a.as_ref()).collect(); - - // --- Build offsets for the parent list --- - let mut offsets = Vec::with_capacity(child_arrays.len() + 1); - offsets.push(0); // the first list always starts at 0 - let mut sum = 0; - for arr in &child_arrays { - sum += arr.len() as i32; // each child's length adds to the total - offsets.push(sum); // store cumulative sum as the next offset + + // Build offsets and collect non-null child arrays + let mut offsets = Vec::with_capacity(list_literal.list_values.len() + 1); + offsets.push(0i32); + let mut child_arrays: Vec = Vec::new(); + + for (i, child_literal) in list_literal.list_values.iter().enumerate() { + if list_literal.null_mask[i] { + // Non-null entry: process the child array + let child_array = literal_to_array_ref(dt.clone(), child_literal.clone())?; + let len = child_array.len() as i32; + offsets.push(offsets.last().unwrap() + len); + child_arrays.push(child_array); + } else { + // Null entry: just repeat the last offset (empty slot) + offsets.push(*offsets.last().unwrap()); + } } - // Concatenate all child arrays' *values* into one array - // Example: [[1,2,3], [4,5,6]] → values = [1,2,3,4,5,6] - let output_array = if !child_refs.is_empty() { + // Concatenate all non-null child arrays' values into one array + let output_array = if !child_arrays.is_empty() { + let child_refs: Vec<&dyn Array> = child_arrays.iter().map(|a| a.as_ref()).collect(); arrow::compute::concat(&child_refs)? } else { - let x = new_empty_array(&dt.clone()); - offsets = vec![x.offset() as i32]; - dbg!(&offsets); - x + // All entries are null or the list is empty + new_empty_array(&dt) }; // Create and return the parent ListArray Ok(Arc::new(ListArray::new( - // Field: item type matches the concatenated child's type FieldRef::from(Field::new("item", output_array.data_type().clone(), true)), OffsetBuffer::new(offsets.into()), output_array, - Some(NullBuffer::from(list_literal.null_mask)), + Some(NullBuffer::from(list_literal.null_mask.clone())), ))) } dt => Err(GeneralError(format!( @@ -3687,12 +3684,14 @@ mod tests { [1, 2, 3], [4, 5, 6], [7, 8, 9, null], + [], null ], [ [10, null, 12] ], - null + null, + [] ] */ let data = ListLiteral { @@ -3733,11 +3732,14 @@ mod tests { null_mask: vec![true], ..Default::default() }, - // ListLiteral { - // ..Default::default() - // }, + ListLiteral { + ..Default::default() + }, + ListLiteral { + ..Default::default() + }, ], - null_mask: vec![true, true], + null_mask: vec![true, true, false, true], ..Default::default() }; @@ -3761,38 +3763,14 @@ mod tests { true, // outer list nullable ))); - // let data = ListLiteral { - // list_values: vec![ - // ListLiteral { - // int_values: vec![1, 2], - // null_mask: vec![true, true], - // ..Default::default() - // }, - // ListLiteral { - // ..Default::default() - // } - // ], - // null_mask: vec![true, false], - // ..Default::default() - // }; - // - // let nested_type = DataType::List(FieldRef::from(Field::new( - // "item", - // DataType::List( - // Field::new("item", DataType::Int32, true).into(), - // ), true))); - let array = literal_to_array_ref(nested_type, data)?; dbg!(&array); dbg!(&array.nulls()); - - - // Top-level should be ListArray> let list_outer = array.as_any().downcast_ref::().unwrap(); - assert_eq!(list_outer.len(), 2); + assert_eq!(list_outer.len(), 4); // First outer element: ListArray let first_elem = list_outer.value(0); From 2af1b3c978ef2c9dde393d743af00051a6ec21a4 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 1 Sep 2025 20:46:15 -0700 Subject: [PATCH 07/11] feat: ArrayType nested literal --- native/core/src/execution/planner.rs | 54 +++++++++---------- .../comet/exec/CometNativeReaderSuite.scala | 8 +++ 2 files changed, 33 insertions(+), 29 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index fcc6c5589b..35e553a6ad 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -86,7 +86,11 @@ use datafusion::physical_expr::window::WindowExpr; use datafusion::physical_expr::LexOrdering; use crate::parquet::parquet_exec::init_datasource_exec; -use arrow::array::{new_empty_array, Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, NullArray, StringBuilder, TimestampMicrosecondArray}; +use arrow::array::{ + new_empty_array, Array, ArrayRef, BinaryBuilder, BooleanArray, Date32Array, Decimal128Array, + Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, ListArray, + NullArray, StringBuilder, TimestampMicrosecondArray, +}; use arrow::buffer::{BooleanBuffer, NullBuffer, OffsetBuffer}; use datafusion::common::utils::SingleRowListArrayBuilder; use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec; @@ -482,7 +486,7 @@ impl PhysicalPlanner { } }, Value::ListVal(values) => { - if let DataType::List(ref f) = data_type { + if let DataType::List(_) = data_type { SingleRowListArrayBuilder::new(literal_to_array_ref(data_type, values.clone())?).build_list_scalar() } else { return Err(GeneralError(format!( @@ -2694,8 +2698,6 @@ fn literal_to_array_ref( list_literal: ListLiteral, ) -> Result { let nulls = &list_literal.null_mask; - dbg!(&data_type); - dbg!(&list_literal); match data_type { DataType::Null => Ok(Arc::new(NullArray::new(nulls.len()))), DataType::Boolean => Ok(Arc::new(BooleanArray::new( @@ -2823,14 +2825,15 @@ fn literal_to_array_ref( } DataType::List(ref f) => { let dt = f.data_type().clone(); - + // Build offsets and collect non-null child arrays let mut offsets = Vec::with_capacity(list_literal.list_values.len() + 1); offsets.push(0i32); let mut child_arrays: Vec = Vec::new(); - + for (i, child_literal) in list_literal.list_values.iter().enumerate() { - if list_literal.null_mask[i] { + // Check if the current child literal is non-null and not the empty array + if list_literal.null_mask[i] && *child_literal != ListLiteral::default() { // Non-null entry: process the child array let child_array = literal_to_array_ref(dt.clone(), child_literal.clone())?; let len = child_array.len() as i32; @@ -3679,21 +3682,21 @@ mod tests { #[tokio::test] async fn test_literal_to_list() -> Result<(), DataFusionError> { /* - [ - [ - [1, 2, 3], - [4, 5, 6], - [7, 8, 9, null], - [], - null - ], - [ - [10, null, 12] - ], - null, - [] - ] - */ + [ + [ + [1, 2, 3], + [4, 5, 6], + [7, 8, 9, null], + [], + null + ], + [ + [10, null, 12] + ], + null, + [] + ] + */ let data = ListLiteral { list_values: vec![ ListLiteral { @@ -3765,22 +3768,17 @@ mod tests { let array = literal_to_array_ref(nested_type, data)?; - dbg!(&array); - dbg!(&array.nulls()); - // Top-level should be ListArray> let list_outer = array.as_any().downcast_ref::().unwrap(); assert_eq!(list_outer.len(), 4); // First outer element: ListArray let first_elem = list_outer.value(0); - dbg!(&first_elem); let list_inner = first_elem.as_any().downcast_ref::().unwrap(); assert_eq!(list_inner.len(), 5); // Inner values let v0 = list_inner.value(0); - dbg!(&v0); let vals0 = v0.as_any().downcast_ref::().unwrap(); assert_eq!(vals0.values(), &[1, 2, 3]); @@ -3801,8 +3799,6 @@ mod tests { let vals3 = v3.as_any().downcast_ref::().unwrap(); assert_eq!(vals3.values(), &[10, 0, 11]); - //println!("result 2 {:?}", build_array(&data)); - Ok(()) } } diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index 8f1e7cfdf0..2b91ba08d9 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -557,4 +557,12 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper assert(sql("SELECT * FROM array_tbl where arr = ARRAY(1L)").count == 1) } } + + test("native reader - support ARRAY literal ARRAY fields") { + testSingleLineQuery( + """ + |select 1 a + |""".stripMargin, + "select array(array(1, 2, null), array(), array(10), null) from tbl") + } } From cc1568e104f6500058a07914331fa3b36cf3bcf6 Mon Sep 17 00:00:00 2001 From: comphead Date: Mon, 1 Sep 2025 20:48:51 -0700 Subject: [PATCH 08/11] feat: ArrayType nested literal --- .../scala/org/apache/comet/exec/CometNativeReaderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index 2b91ba08d9..1418608f74 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -558,7 +558,7 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper } } - test("native reader - support ARRAY literal ARRAY fields") { + test("native reader - support ARRAY literal nested ARRAY fields") { testSingleLineQuery( """ |select 1 a From 93cdbc051c1da070b139ebdca6bd725be3bdabb9 Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 3 Sep 2025 13:49:41 -0700 Subject: [PATCH 09/11] feat: ArrayType nested literal --- .../scala/org/apache/comet/exec/CometNativeReaderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala index 1418608f74..63164c0fc9 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometNativeReaderSuite.scala @@ -563,6 +563,6 @@ class CometNativeReaderSuite extends CometTestBase with AdaptiveSparkPlanHelper """ |select 1 a |""".stripMargin, - "select array(array(1, 2, null), array(), array(10), null) from tbl") + "select array(array(1, 2, null), array(), array(10), null, array(null)) from tbl") } } From 118e6add020b8037cade01c59eb8629dea2a1cbf Mon Sep 17 00:00:00 2001 From: comphead Date: Wed, 3 Sep 2025 14:57:08 -0700 Subject: [PATCH 10/11] feat: ArrayType nested literal --- .../apache/comet/CometArrayExpressionSuite.scala | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala index 025cc19e1b..56d9b3b429 100644 --- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala @@ -692,4 +692,20 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp } } } + + test("array literals") { + withSQLConf( + CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true", + CometConf.COMET_EXPLAIN_FALLBACK_ENABLED.key -> "true") { + Seq(true, false).foreach { dictionaryEnabled => + withTempDir { dir => + val path = new Path(dir.toURI.toString, "test.parquet") + makeParquetFileAllPrimitiveTypes(path, dictionaryEnabled, 100) + spark.read.parquet(path.toString).createOrReplaceTempView("t1") + checkSparkAnswerAndOperator( + sql("SELECT array(array(1, 2, 3), null, array(), array(null), array(1)) from t1")) + } + } + } + } } From 19226b15185c32c131c83db9ac018dab75e05d2c Mon Sep 17 00:00:00 2001 From: comphead Date: Sat, 13 Sep 2025 12:33:03 -0700 Subject: [PATCH 11/11] upmerge --- native/core/src/execution/planner.rs | 1 + .../org/apache/comet/serde/literals.scala | 185 ++++++++++-------- 2 files changed, 102 insertions(+), 84 deletions(-) diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 35e553a6ad..1465d33ade 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -2828,6 +2828,7 @@ fn literal_to_array_ref( // Build offsets and collect non-null child arrays let mut offsets = Vec::with_capacity(list_literal.list_values.len() + 1); + // Offsets starts with 0 offsets.push(0i32); let mut child_arrays: Vec = Vec::new(); diff --git a/spark/src/main/scala/org/apache/comet/serde/literals.scala b/spark/src/main/scala/org/apache/comet/serde/literals.scala index beae0b26b6..2eae01e594 100644 --- a/spark/src/main/scala/org/apache/comet/serde/literals.scala +++ b/spark/src/main/scala/org/apache/comet/serde/literals.scala @@ -19,6 +19,8 @@ package org.apache.comet.serde.literals +import java.lang + import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal} import org.apache.spark.sql.catalyst.util.GenericArrayData @@ -27,7 +29,6 @@ import org.apache.spark.unsafe.types.UTF8String import com.google.protobuf.ByteString -import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.withInfo import org.apache.comet.DataTypeSupport.isComplexType import org.apache.comet.serde.{CometExpressionSerde, Compatible, ExprOuterClass, LiteralOuterClass, SupportLevel, Unsupported} @@ -41,13 +42,15 @@ object CometLiteral extends CometExpressionSerde[Literal] with Logging { if (supportedDataType( expr.dataType, allowComplex = expr.value == null || + // Nested literal support for native reader // can be tracked https://github.com/apache/datafusion-comet/issues/1937 - // now supports only Array of primitive - (Seq(CometConf.SCAN_NATIVE_ICEBERG_COMPAT, CometConf.SCAN_NATIVE_DATAFUSION) - .contains(CometConf.COMET_NATIVE_SCAN_IMPL.get()) && expr.dataType - .isInstanceOf[ArrayType]) && !isComplexType( - expr.dataType.asInstanceOf[ArrayType].elementType))) { + (expr.dataType + .isInstanceOf[ArrayType] && (!isComplexType( + expr.dataType.asInstanceOf[ArrayType].elementType) || expr.dataType + .asInstanceOf[ArrayType] + .elementType + .isInstanceOf[ArrayType])))) { Compatible(None) } else { Unsupported(Some(s"Unsupported data type ${expr.dataType}")) @@ -86,84 +89,10 @@ object CometLiteral extends CometExpressionSerde[Literal] with Logging { val byteStr = com.google.protobuf.ByteString.copyFrom(value.asInstanceOf[Array[Byte]]) exprBuilder.setBytesVal(byteStr) - case a: ArrayType => - val listLiteralBuilder = ListLiteral.newBuilder() - val array = value.asInstanceOf[GenericArrayData].array - a.elementType match { - case NullType => - array.foreach(_ => listLiteralBuilder.addNullMask(true)) - case BooleanType => - array.foreach(v => { - val casted = v.asInstanceOf[java.lang.Boolean] - listLiteralBuilder.addBooleanValues(casted) - listLiteralBuilder.addNullMask(casted != null) - }) - case ByteType => - array.foreach(v => { - val casted = v.asInstanceOf[java.lang.Integer] - listLiteralBuilder.addByteValues(casted) - listLiteralBuilder.addNullMask(casted != null) - }) - case ShortType => - array.foreach(v => { - val casted = v.asInstanceOf[java.lang.Short] - listLiteralBuilder.addShortValues( - if (casted != null) casted.intValue() - else null.asInstanceOf[java.lang.Integer]) - listLiteralBuilder.addNullMask(casted != null) - }) - case IntegerType | DateType => - array.foreach(v => { - val casted = v.asInstanceOf[java.lang.Integer] - listLiteralBuilder.addIntValues(casted) - listLiteralBuilder.addNullMask(casted != null) - }) - case LongType | TimestampType | TimestampNTZType => - array.foreach(v => { - val casted = v.asInstanceOf[java.lang.Long] - listLiteralBuilder.addLongValues(casted) - listLiteralBuilder.addNullMask(casted != null) - }) - case FloatType => - array.foreach(v => { - val casted = v.asInstanceOf[java.lang.Float] - listLiteralBuilder.addFloatValues(casted) - listLiteralBuilder.addNullMask(casted != null) - }) - case DoubleType => - array.foreach(v => { - val casted = v.asInstanceOf[java.lang.Double] - listLiteralBuilder.addDoubleValues(casted) - listLiteralBuilder.addNullMask(casted != null) - }) - case StringType => - array.foreach(v => { - val casted = v.asInstanceOf[org.apache.spark.unsafe.types.UTF8String] - listLiteralBuilder.addStringValues(if (casted != null) casted.toString else "") - listLiteralBuilder.addNullMask(casted != null) - }) - case _: DecimalType => - array - .foreach(v => { - val casted = - v.asInstanceOf[Decimal] - listLiteralBuilder.addDecimalValues(if (casted != null) { - com.google.protobuf.ByteString - .copyFrom(casted.toBigDecimal.underlying.unscaledValue.toByteArray) - } else ByteString.EMPTY) - listLiteralBuilder.addNullMask(casted != null) - }) - case _: BinaryType => - array - .foreach(v => { - val casted = - v.asInstanceOf[Array[Byte]] - listLiteralBuilder.addBytesValues(if (casted != null) { - com.google.protobuf.ByteString.copyFrom(casted) - } else ByteString.EMPTY) - listLiteralBuilder.addNullMask(casted != null) - }) - } + + case arr: ArrayType => + val listLiteralBuilder: ListLiteral.Builder = + makeListLiteral(value.asInstanceOf[GenericArrayData].array, arr) exprBuilder.setListVal(listLiteralBuilder.build()) exprBuilder.setDatatype(serializeDataType(dataType).get) case dt => @@ -188,4 +117,92 @@ object CometLiteral extends CometExpressionSerde[Literal] with Logging { } } + + private def makeListLiteral(array: Array[Any], arrayType: ArrayType): ListLiteral.Builder = { + val listLiteralBuilder = ListLiteral.newBuilder() + arrayType.elementType match { + case NullType => + array.foreach(_ => listLiteralBuilder.addNullMask(true)) + case BooleanType => + array.foreach(v => { + val casted = v.asInstanceOf[lang.Boolean] + listLiteralBuilder.addBooleanValues(casted) + listLiteralBuilder.addNullMask(casted != null) + }) + case ByteType => + array.foreach(v => { + val casted = v.asInstanceOf[Integer] + listLiteralBuilder.addByteValues(casted) + listLiteralBuilder.addNullMask(casted != null) + }) + case ShortType => + array.foreach(v => { + val casted = v.asInstanceOf[lang.Short] + listLiteralBuilder.addShortValues( + if (casted != null) casted.intValue() + else null.asInstanceOf[Integer]) + listLiteralBuilder.addNullMask(casted != null) + }) + case IntegerType | DateType => + array.foreach(v => { + val casted = v.asInstanceOf[Integer] + listLiteralBuilder.addIntValues(casted) + listLiteralBuilder.addNullMask(casted != null) + }) + case LongType | TimestampType | TimestampNTZType => + array.foreach(v => { + val casted = v.asInstanceOf[lang.Long] + listLiteralBuilder.addLongValues(casted) + listLiteralBuilder.addNullMask(casted != null) + }) + case FloatType => + array.foreach(v => { + val casted = v.asInstanceOf[lang.Float] + listLiteralBuilder.addFloatValues(casted) + listLiteralBuilder.addNullMask(casted != null) + }) + case DoubleType => + array.foreach(v => { + val casted = v.asInstanceOf[lang.Double] + listLiteralBuilder.addDoubleValues(casted) + listLiteralBuilder.addNullMask(casted != null) + }) + case StringType => + array.foreach(v => { + val casted = v.asInstanceOf[UTF8String] + listLiteralBuilder.addStringValues(if (casted != null) casted.toString else "") + listLiteralBuilder.addNullMask(casted != null) + }) + case _: DecimalType => + array + .foreach(v => { + val casted = + v.asInstanceOf[Decimal] + listLiteralBuilder.addDecimalValues(if (casted != null) { + com.google.protobuf.ByteString + .copyFrom(casted.toBigDecimal.underlying.unscaledValue.toByteArray) + } else ByteString.EMPTY) + listLiteralBuilder.addNullMask(casted != null) + }) + case _: BinaryType => + array + .foreach(v => { + val casted = + v.asInstanceOf[Array[Byte]] + listLiteralBuilder.addBytesValues(if (casted != null) { + com.google.protobuf.ByteString.copyFrom(casted) + } else ByteString.EMPTY) + listLiteralBuilder.addNullMask(casted != null) + }) + case a: ArrayType => + array.foreach(v => { + val casted = v.asInstanceOf[GenericArrayData] + listLiteralBuilder.addListValues(if (casted != null) { + makeListLiteral(casted.array, a) + } else ListLiteral.newBuilder()) + listLiteralBuilder.addNullMask(casted != null) + }) + } + listLiteralBuilder + } }