diff --git a/crates/iceberg/src/arrow/value.rs b/crates/iceberg/src/arrow/value.rs index 30b47d83fc..d07233c420 100644 --- a/crates/iceberg/src/arrow/value.rs +++ b/crates/iceberg/src/arrow/value.rs @@ -24,7 +24,7 @@ use arrow_array::{ Time64MicrosecondArray, TimestampMicrosecondArray, TimestampNanosecondArray, }; use arrow_buffer::NullBuffer; -use arrow_schema::{DataType, FieldRef}; +use arrow_schema::{DataType, FieldRef, TimeUnit}; use uuid::Uuid; use super::get_field_id_from_metadata; @@ -645,6 +645,38 @@ pub(crate) fn create_primitive_array_single_element( Ok(Arc::new(Int64Array::from(vec![*v]))) } (DataType::Int64, None) => Ok(Arc::new(Int64Array::from(vec![Option::::None]))), + (DataType::Timestamp(TimeUnit::Microsecond, timezone), Some(PrimitiveLiteral::Long(v))) => { + let array = TimestampMicrosecondArray::from(vec![*v]); + if let Some(timezone) = timezone { + Ok(Arc::new(array.with_timezone(timezone.clone()))) + } else { + Ok(Arc::new(array)) + } + } + (DataType::Timestamp(TimeUnit::Microsecond, timezone), None) => { + let array = TimestampMicrosecondArray::from(vec![Option::::None]); + if let Some(timezone) = timezone { + Ok(Arc::new(array.with_timezone(timezone.clone()))) + } else { + Ok(Arc::new(array)) + } + } + (DataType::Timestamp(TimeUnit::Nanosecond, timezone), Some(PrimitiveLiteral::Long(v))) => { + let array = TimestampNanosecondArray::from(vec![*v]); + if let Some(timezone) = timezone { + Ok(Arc::new(array.with_timezone(timezone.clone()))) + } else { + Ok(Arc::new(array)) + } + } + (DataType::Timestamp(TimeUnit::Nanosecond, timezone), None) => { + let array = TimestampNanosecondArray::from(vec![Option::::None]); + if let Some(timezone) = timezone { + Ok(Arc::new(array.with_timezone(timezone.clone()))) + } else { + Ok(Arc::new(array)) + } + } (DataType::Float32, Some(PrimitiveLiteral::Float(v))) => { Ok(Arc::new(Float32Array::from(vec![v.0]))) } @@ -720,6 +752,22 @@ pub(crate) fn create_primitive_array_single_element( DataType::Int64 => { Ok(Arc::new(Int64Array::from(vec![Option::::None])) as ArrayRef) } + DataType::Timestamp(TimeUnit::Microsecond, timezone) => { + let array = TimestampMicrosecondArray::from(vec![Option::::None]); + if let Some(timezone) = timezone { + Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef) + } else { + Ok(Arc::new(array) as ArrayRef) + } + } + DataType::Timestamp(TimeUnit::Nanosecond, timezone) => { + let array = TimestampNanosecondArray::from(vec![Option::::None]); + if let Some(timezone) = timezone { + Ok(Arc::new(array.with_timezone(timezone.clone())) as ArrayRef) + } else { + Ok(Arc::new(array) as ArrayRef) + } + } DataType::Float32 => { Ok(Arc::new(Float32Array::from(vec![Option::::None])) as ArrayRef) } @@ -793,6 +841,46 @@ pub(crate) fn create_primitive_array_repeated( let vals: Vec> = vec![None; num_rows]; Arc::new(Int64Array::from(vals)) } + ( + DataType::Timestamp(TimeUnit::Microsecond, timezone), + Some(PrimitiveLiteral::Long(value)), + ) => { + let array = TimestampMicrosecondArray::from(vec![*value; num_rows]); + if let Some(timezone) = timezone { + Arc::new(array.with_timezone(timezone.clone())) + } else { + Arc::new(array) + } + } + (DataType::Timestamp(TimeUnit::Microsecond, timezone), None) => { + let vals: Vec> = vec![None; num_rows]; + let array = TimestampMicrosecondArray::from(vals); + if let Some(timezone) = timezone { + Arc::new(array.with_timezone(timezone.clone())) + } else { + Arc::new(array) + } + } + ( + DataType::Timestamp(TimeUnit::Nanosecond, timezone), + Some(PrimitiveLiteral::Long(value)), + ) => { + let array = TimestampNanosecondArray::from(vec![*value; num_rows]); + if let Some(timezone) = timezone { + Arc::new(array.with_timezone(timezone.clone())) + } else { + Arc::new(array) + } + } + (DataType::Timestamp(TimeUnit::Nanosecond, timezone), None) => { + let vals: Vec> = vec![None; num_rows]; + let array = TimestampNanosecondArray::from(vals); + if let Some(timezone) = timezone { + Arc::new(array.with_timezone(timezone.clone())) + } else { + Arc::new(array) + } + } (DataType::Float32, Some(PrimitiveLiteral::Float(value))) => { Arc::new(Float32Array::from(vec![value.0; num_rows])) } @@ -1781,4 +1869,30 @@ mod test { assert_eq!(array.len(), num_rows); } + + #[test] + fn test_create_timestamp_microsecond_array_repeated() { + let target_type = DataType::Timestamp(TimeUnit::Microsecond, None); + let value = PrimitiveLiteral::Long(1_740_600_000_000_000); + let num_rows = 3; + + let array = create_primitive_array_repeated(&target_type, &Some(value), num_rows) + .expect("Failed to create repeated timestamp microsecond array"); + + assert_eq!(array.data_type(), &target_type); + assert_eq!(array.len(), num_rows); + } + + #[test] + fn test_create_timestamp_microsecond_with_timezone_array_repeated() { + let target_type = DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())); + let value = PrimitiveLiteral::Long(1_740_600_000_000_000); + let num_rows = 2; + + let array = create_primitive_array_repeated(&target_type, &Some(value), num_rows) + .expect("Failed to create repeated timestamp microsecond array with timezone"); + + assert_eq!(array.data_type(), &target_type); + assert_eq!(array.len(), num_rows); + } }