From 9c05825d1c7261aab52030ee0e1ad2ca17d4cd1d Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 27 Oct 2025 11:15:15 +0200 Subject: [PATCH 1/3] refactor: remove dependency on arrow_ord --- arrow-cast/Cargo.toml | 1 - arrow-cast/src/cast/run_array.rs | 26 +++++++++++++++----------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/arrow-cast/Cargo.toml b/arrow-cast/Cargo.toml index fb5ad1af3d3a..1e698833eaa0 100644 --- a/arrow-cast/Cargo.toml +++ b/arrow-cast/Cargo.toml @@ -43,7 +43,6 @@ force_validate = [] arrow-array = { workspace = true } arrow-buffer = { workspace = true } arrow-data = { workspace = true } -arrow-ord = { workspace = true } arrow-schema = { workspace = true } arrow-select = { workspace = true } chrono = { workspace = true } diff --git a/arrow-cast/src/cast/run_array.rs b/arrow-cast/src/cast/run_array.rs index 0d4679d9f3f5..67a65bf08ef2 100644 --- a/arrow-cast/src/cast/run_array.rs +++ b/arrow-cast/src/cast/run_array.rs @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. +use std::vec; + use crate::cast::*; -use arrow_ord::partition::partition; +use arrow_array::Array; /// Attempts to cast a `RunArray` with index type K into /// `to_type` for supported types. @@ -134,17 +136,19 @@ pub(crate) fn cast_to_run_end_encoded( )); } - // Partition the array to identify runs of consecutive equal values - let partitions = partition(&[Arc::clone(cast_array)])?; - let size = partitions.len(); - let mut run_ends = Vec::with_capacity(size); - let mut values_indexes = Vec::with_capacity(size); - let mut last_partition_end = 0; - for partition in partitions.ranges() { - values_indexes.push(last_partition_end); - run_ends.push(partition.end); - last_partition_end = partition.end; + // Identify run boundaries by comparing consecutive values + let mut run_ends = Vec::new(); + let mut values_indexes = vec![0usize]; // Always include the first index + let mut current_data = cast_array.slice(0, 1).to_data(); + for idx in 1..cast_array.len() { + let next_data = cast_array.slice(idx, 1).to_data(); + if current_data != next_data { + run_ends.push(idx); + values_indexes.push(idx); + current_data = next_data; + } } + run_ends.push(cast_array.len()); // Build the run_ends array for run_end in run_ends { From 6b76708a27898c6a7967d21973d09f3b8d1c30b2 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Mon, 27 Oct 2025 18:47:31 +0200 Subject: [PATCH 2/3] chore --- arrow-cast/src/cast/run_array.rs | 502 ++++++++++++++++++++++++++++++- 1 file changed, 487 insertions(+), 15 deletions(-) diff --git a/arrow-cast/src/cast/run_array.rs b/arrow-cast/src/cast/run_array.rs index 67a65bf08ef2..ee8260048735 100644 --- a/arrow-cast/src/cast/run_array.rs +++ b/arrow-cast/src/cast/run_array.rs @@ -15,10 +15,21 @@ // specific language governing permissions and limitations // under the License. -use std::vec; - use crate::cast::*; -use arrow_array::Array; +use arrow_array::cast::AsArray; +use arrow_array::types::{ + ArrowDictionaryKeyType, ArrowPrimitiveType, Date32Type, Date64Type, Decimal128Type, + Decimal256Type, DurationMicrosecondType, DurationMillisecondType, DurationNanosecondType, + DurationSecondType, Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, + Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, + Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type, +}; +use arrow_array::{ + Array, ArrayRef, BinaryViewArray, BooleanArray, DictionaryArray, FixedSizeBinaryArray, + GenericBinaryArray, GenericStringArray, PrimitiveArray, StringViewArray, +}; /// Attempts to cast a `RunArray` with index type K into /// `to_type` for supported types. @@ -137,18 +148,7 @@ pub(crate) fn cast_to_run_end_encoded( } // Identify run boundaries by comparing consecutive values - let mut run_ends = Vec::new(); - let mut values_indexes = vec![0usize]; // Always include the first index - let mut current_data = cast_array.slice(0, 1).to_data(); - for idx in 1..cast_array.len() { - let next_data = cast_array.slice(idx, 1).to_data(); - if current_data != next_data { - run_ends.push(idx); - values_indexes.push(idx); - current_data = next_data; - } - } - run_ends.push(cast_array.len()); + let (run_ends, values_indexes) = compute_run_boundaries(cast_array); // Build the run_ends array for run_end in run_ends { @@ -167,3 +167,475 @@ pub(crate) fn cast_to_run_end_encoded( let run_array = RunArray::::try_new(&run_ends_array, values_array.as_ref())?; Ok(Arc::new(run_array)) } + +fn compute_run_boundaries(array: &ArrayRef) -> (Vec, Vec) { + if array.is_empty() { + return (Vec::new(), Vec::new()); + } + + use arrow_schema::{DataType::*, IntervalUnit, TimeUnit}; + + match array.data_type() { + Null => (vec![array.len()], vec![0]), + Boolean => runs_for_boolean(array.as_boolean()), + Int8 => runs_for_primitive(array.as_primitive::()), + Int16 => runs_for_primitive(array.as_primitive::()), + Int32 => runs_for_primitive(array.as_primitive::()), + Int64 => runs_for_primitive(array.as_primitive::()), + UInt8 => runs_for_primitive(array.as_primitive::()), + UInt16 => runs_for_primitive(array.as_primitive::()), + UInt32 => runs_for_primitive(array.as_primitive::()), + UInt64 => runs_for_primitive(array.as_primitive::()), + Float16 => runs_for_primitive(array.as_primitive::()), + Float32 => runs_for_primitive(array.as_primitive::()), + Float64 => runs_for_primitive(array.as_primitive::()), + Date32 => runs_for_primitive(array.as_primitive::()), + Date64 => runs_for_primitive(array.as_primitive::()), + Time32(TimeUnit::Second) => runs_for_primitive(array.as_primitive::()), + Time32(TimeUnit::Millisecond) => { + runs_for_primitive(array.as_primitive::()) + } + Time64(TimeUnit::Microsecond) => { + runs_for_primitive(array.as_primitive::()) + } + Time64(TimeUnit::Nanosecond) => { + runs_for_primitive(array.as_primitive::()) + } + Duration(TimeUnit::Second) => { + runs_for_primitive(array.as_primitive::()) + } + Duration(TimeUnit::Millisecond) => { + runs_for_primitive(array.as_primitive::()) + } + Duration(TimeUnit::Microsecond) => { + runs_for_primitive(array.as_primitive::()) + } + Duration(TimeUnit::Nanosecond) => { + runs_for_primitive(array.as_primitive::()) + } + Timestamp(TimeUnit::Second, _) => { + runs_for_primitive(array.as_primitive::()) + } + Timestamp(TimeUnit::Millisecond, _) => { + runs_for_primitive(array.as_primitive::()) + } + Timestamp(TimeUnit::Microsecond, _) => { + runs_for_primitive(array.as_primitive::()) + } + Timestamp(TimeUnit::Nanosecond, _) => { + runs_for_primitive(array.as_primitive::()) + } + Interval(IntervalUnit::YearMonth) => { + runs_for_primitive(array.as_primitive::()) + } + Interval(IntervalUnit::DayTime) => { + runs_for_primitive(array.as_primitive::()) + } + Interval(IntervalUnit::MonthDayNano) => { + runs_for_primitive(array.as_primitive::()) + } + Decimal128(_, _) => runs_for_primitive(array.as_primitive::()), + Decimal256(_, _) => runs_for_primitive(array.as_primitive::()), + Utf8 => runs_for_string_i32(array.as_string::()), + LargeUtf8 => runs_for_string_i64(array.as_string::()), + Utf8View => runs_for_string_view(array.as_string_view()), + Binary => runs_for_binary_i32(array.as_binary::()), + LargeBinary => runs_for_binary_i64(array.as_binary::()), + BinaryView => runs_for_binary_view(array.as_binary_view()), + FixedSizeBinary(_) => runs_for_fixed_size_binary(array.as_fixed_size_binary()), + Dictionary(key_type, _) => match key_type.as_ref() { + Int8 => runs_for_dictionary::(array.as_dictionary()), + Int16 => runs_for_dictionary::(array.as_dictionary()), + Int32 => runs_for_dictionary::(array.as_dictionary()), + Int64 => runs_for_dictionary::(array.as_dictionary()), + UInt8 => runs_for_dictionary::(array.as_dictionary()), + UInt16 => runs_for_dictionary::(array.as_dictionary()), + UInt32 => runs_for_dictionary::(array.as_dictionary()), + UInt64 => runs_for_dictionary::(array.as_dictionary()), + _ => runs_generic(array.as_ref()), + }, + _ => runs_generic(array.as_ref()), + } +} + +fn runs_for_boolean(array: &BooleanArray) -> (Vec, Vec) { + let len = array.len(); + if let Some(runs) = trivial_runs(len) { + return runs; + } + + let mut run_boundaries = Vec::with_capacity(len / 64 + 2); + let mut current_valid = array.is_valid(0); + let mut current_value = if current_valid { array.value(0) } else { false }; + + for idx in 1..len { + let valid = array.is_valid(idx); + let mut boundary = false; + if current_valid && valid { + let value = array.value(idx); + if value != current_value { + current_value = value; + boundary = true; + } + } else if current_valid != valid { + boundary = true; + if valid { + current_value = array.value(idx); + } + } + + if boundary { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + } + current_valid = valid; + } + + finalize_runs(run_boundaries, len) +} + +fn runs_for_primitive( + array: &PrimitiveArray, +) -> (Vec, Vec) { + let len = array.len(); + if let Some(runs) = trivial_runs(len) { + return runs; + } + + let values = array.values(); + let mut run_boundaries = Vec::with_capacity(len / 64 + 2); + + if array.null_count() == 0 { + let mut current = unsafe { *values.get_unchecked(0) }; + let mut idx = 1; + while idx < len { + let boundary = scan_run_end::(values, current, idx); + if boundary == len { + break; + } + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(boundary); + current = unsafe { *values.get_unchecked(boundary) }; + idx = boundary + 1; + } + return finalize_runs(run_boundaries, len); + } + + let nulls = array + .nulls() + .expect("null_count > 0 implies a null buffer is present"); + let mut current_valid = nulls.is_valid(0); + let mut current_value = unsafe { *values.get_unchecked(0) }; + for idx in 1..len { + let valid = nulls.is_valid(idx); + let mut boundary = false; + if current_valid && valid { + let value = unsafe { *values.get_unchecked(idx) }; + if value != current_value { + current_value = value; + boundary = true; + } + } else if current_valid != valid { + boundary = true; + if valid { + current_value = unsafe { *values.get_unchecked(idx) }; + } + } + if boundary { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + } + current_valid = valid; + } + finalize_runs(run_boundaries, len) +} + +fn runs_for_binary_i32(array: &GenericBinaryArray) -> (Vec, Vec) { + let mut to_usize = |v: i32| v as usize; + runs_for_binary_like( + array.len(), + array.null_count(), + array.value_offsets(), + array.value_data(), + |idx| array.is_valid(idx), + &mut to_usize, + ) +} + +fn runs_for_binary_i64(array: &GenericBinaryArray) -> (Vec, Vec) { + let mut to_usize = |v: i64| v as usize; + runs_for_binary_like( + array.len(), + array.null_count(), + array.value_offsets(), + array.value_data(), + |idx| array.is_valid(idx), + &mut to_usize, + ) +} + +fn runs_for_binary_like( + len: usize, + null_count: usize, + offsets: &[T], + values: &[u8], + mut is_valid: impl FnMut(usize) -> bool, + to_usize: &mut impl FnMut(T) -> usize, +) -> (Vec, Vec) { + if let Some(runs) = trivial_runs(len) { + return runs; + } + + let mut run_boundaries = Vec::with_capacity(len / 64 + 2); + + if null_count == 0 { + let mut current_start = to_usize(offsets[0]); + let mut current_end = to_usize(offsets[1]); + for idx in 1..len { + let start = to_usize(offsets[idx]); + let end = to_usize(offsets[idx + 1]); + if (end - start) != (current_end - current_start) + || values[start..end] != values[current_start..current_end] + { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + current_start = start; + current_end = end; + } + } + } else { + let mut current_valid = is_valid(0); + let mut current_range = (to_usize(offsets[0]), to_usize(offsets[1])); + for idx in 1..len { + let valid = is_valid(idx); + let mut boundary = false; + if current_valid && valid { + let start = to_usize(offsets[idx]); + let end = to_usize(offsets[idx + 1]); + let (current_start, current_end) = current_range; + if (end - start) != (current_end - current_start) + || values[start..end] != values[current_start..current_end] + { + boundary = true; + current_range = (start, end); + } + } else if current_valid != valid { + boundary = true; + if valid { + current_range = (to_usize(offsets[idx]), to_usize(offsets[idx + 1])); + } + } + if boundary { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + } + current_valid = valid; + } + } + + finalize_runs(run_boundaries, len) +} + +fn runs_for_string_i32(array: &GenericStringArray) -> (Vec, Vec) { + let mut to_usize = |v: i32| v as usize; + runs_for_binary_like( + array.len(), + array.null_count(), + array.value_offsets(), + array.value_data(), + |idx| array.is_valid(idx), + &mut to_usize, + ) +} + +fn runs_for_string_i64(array: &GenericStringArray) -> (Vec, Vec) { + let mut to_usize = |v: i64| v as usize; + runs_for_binary_like( + array.len(), + array.null_count(), + array.value_offsets(), + array.value_data(), + |idx| array.is_valid(idx), + &mut to_usize, + ) +} + +fn runs_for_string_view(array: &StringViewArray) -> (Vec, Vec) { + runs_generic(array) +} + +fn runs_for_binary_view(array: &BinaryViewArray) -> (Vec, Vec) { + runs_generic(array) +} + +fn runs_for_fixed_size_binary(array: &FixedSizeBinaryArray) -> (Vec, Vec) { + let len = array.len(); + if let Some(runs) = trivial_runs(len) { + return runs; + } + + let width = array.value_length() as usize; + let values = array.value_data(); + let mut run_boundaries = Vec::with_capacity(len / 64 + 2); + if array.null_count() == 0 { + let mut current_slice = &values[0..width]; + for idx in 1..len { + let start = idx * width; + let slice = &values[start..start + width]; + if slice != current_slice { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + current_slice = slice; + } + } + } else { + let nulls = array + .nulls() + .expect("null_count > 0 implies a null buffer is present"); + let mut current_valid = nulls.is_valid(0); + let mut current_slice = &values[0..width]; + for idx in 1..len { + let valid = nulls.is_valid(idx); + let mut boundary = false; + if current_valid && valid { + let start = idx * width; + let slice = &values[start..start + width]; + if slice != current_slice { + boundary = true; + current_slice = slice; + } + } else if current_valid != valid { + boundary = true; + if valid { + let start = idx * width; + current_slice = &values[start..start + width]; + } + } + if boundary { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + } + current_valid = valid; + } + } + + finalize_runs(run_boundaries, len) +} + +fn runs_for_dictionary( + array: &DictionaryArray, +) -> (Vec, Vec) { + runs_for_primitive(array.keys()) +} + +fn runs_generic(array: &dyn Array) -> (Vec, Vec) { + let len = array.len(); + if let Some(runs) = trivial_runs(len) { + return runs; + } + + let mut run_boundaries = Vec::with_capacity(len / 64 + 2); + let mut current_data = array.slice(0, 1).to_data(); + for idx in 1..len { + let next_data = array.slice(idx, 1).to_data(); + if current_data != next_data { + ensure_capacity(&mut run_boundaries, len); + run_boundaries.push(idx); + current_data = next_data; + } + } + + finalize_runs(run_boundaries, len) +} + +fn trivial_runs(len: usize) -> Option<(Vec, Vec)> { + match len { + 0 => Some((Vec::new(), Vec::new())), + 1 => Some((vec![1], vec![0])), + _ => None, + } +} + +#[inline] +fn ensure_capacity(vec: &mut Vec, total_len: usize) { + if vec.len() == vec.capacity() { + let remaining = total_len.saturating_sub(vec.len()); + vec.reserve(remaining.max(1)); + } +} + +fn finalize_runs(mut run_boundaries: Vec, len: usize) -> (Vec, Vec) { + let mut values_indexes = Vec::with_capacity(run_boundaries.len() + 1); + values_indexes.push(0); + values_indexes.extend_from_slice(&run_boundaries); + run_boundaries.push(len); + (run_boundaries, values_indexes) +} + +#[inline] +fn scan_run_end( + values: &[T::Native], + current: T::Native, + start: usize, +) -> usize { + let element_size = std::mem::size_of::(); + if element_size <= 8 && 16 % element_size == 0 { + let elements_per_chunk = 16 / element_size; + return scan_run_end_chunk::(values, current, start, elements_per_chunk, element_size); + } + scan_run_end_scalar::(values, current, start) +} + +#[inline] +fn scan_run_end_chunk( + values: &[T::Native], + current: T::Native, + start: usize, + elements_per_chunk: usize, + element_size: usize, +) -> usize { + let len = values.len(); + let mut idx = start; + if idx >= len { + return len; + } + + let mut pattern_bytes = [0u8; 16]; + unsafe { + let value_bytes = + std::slice::from_raw_parts(¤t as *const T::Native as *const u8, element_size); + for chunk in pattern_bytes.chunks_mut(element_size) { + chunk.copy_from_slice(value_bytes); + } + } + let pattern = u128::from_ne_bytes(pattern_bytes); + + while idx + elements_per_chunk <= len { + let chunk = unsafe { (values.as_ptr().add(idx) as *const u128).read_unaligned() }; + if chunk != pattern { + for offset in 0..elements_per_chunk { + let value = unsafe { *values.get_unchecked(idx + offset) }; + if value != current { + return idx + offset; + } + } + unreachable!("chunk mismatch without locating differing element"); + } + idx += elements_per_chunk; + } + + scan_run_end_scalar::(values, current, idx) +} + +#[inline] +fn scan_run_end_scalar( + values: &[T::Native], + current: T::Native, + mut idx: usize, +) -> usize { + let len = values.len(); + while idx < len && unsafe { *values.get_unchecked(idx) } == current { + idx += 1; + } + idx +} From 4fd7761632d5dd0bc8a401303551fb5f22fcc690 Mon Sep 17 00:00:00 2001 From: Weijun-H Date: Tue, 28 Oct 2025 10:16:48 +0200 Subject: [PATCH 3/3] chore: Added comments --- arrow-cast/src/cast/run_array.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/arrow-cast/src/cast/run_array.rs b/arrow-cast/src/cast/run_array.rs index ee8260048735..7c7039a6884d 100644 --- a/arrow-cast/src/cast/run_array.rs +++ b/arrow-cast/src/cast/run_array.rs @@ -38,6 +38,9 @@ pub(crate) fn run_end_encoded_cast( to_type: &DataType, cast_options: &CastOptions, ) -> Result { + // Fast-path dispatch: most physical types can reuse `runs_for_primitive`, the remainder fall + // through to specialized implementations below. + // Route to the most specialized helper for the physical layout of `array`. match array.data_type() { DataType::RunEndEncoded(_, _) => { let run_array = array @@ -269,6 +272,7 @@ fn runs_for_boolean(array: &BooleanArray) -> (Vec, Vec) { let mut current_value = if current_valid { array.value(0) } else { false }; for idx in 1..len { + // Treat a change in validity the same as a change in value so null boundaries are recorded. let valid = array.is_valid(idx); let mut boundary = false; if current_valid && valid { @@ -309,6 +313,7 @@ fn runs_for_primitive( let mut current = unsafe { *values.get_unchecked(0) }; let mut idx = 1; while idx < len { + // Attempt to advance in 16-byte chunks before falling back to scalar comparison. let boundary = scan_run_end::(values, current, idx); if boundary == len { break; @@ -394,6 +399,7 @@ fn runs_for_binary_like( for idx in 1..len { let start = to_usize(offsets[idx]); let end = to_usize(offsets[idx + 1]); + // Any difference in byte length or payload means a new run. if (end - start) != (current_end - current_start) || values[start..end] != values[current_start..current_end] { @@ -413,6 +419,7 @@ fn runs_for_binary_like( let start = to_usize(offsets[idx]); let end = to_usize(offsets[idx + 1]); let (current_start, current_end) = current_range; + // Keep reusing the current byte-range as long as both validity and payload match. if (end - start) != (current_end - current_start) || values[start..end] != values[current_start..current_end] { @@ -482,6 +489,7 @@ fn runs_for_fixed_size_binary(array: &FixedSizeBinaryArray) -> (Vec, Vec< for idx in 1..len { let start = idx * width; let slice = &values[start..start + width]; + // Width is constant, so a simple byte slice comparison suffices. if slice != current_slice { ensure_capacity(&mut run_boundaries, len); run_boundaries.push(idx); @@ -538,6 +546,7 @@ fn runs_generic(array: &dyn Array) -> (Vec, Vec) { let mut current_data = array.slice(0, 1).to_data(); for idx in 1..len { let next_data = array.slice(idx, 1).to_data(); + // Fallback for exotic types: compare `ArrayData` views directly. if current_data != next_data { ensure_capacity(&mut run_boundaries, len); run_boundaries.push(idx); @@ -566,6 +575,7 @@ fn ensure_capacity(vec: &mut Vec, total_len: usize) { fn finalize_runs(mut run_boundaries: Vec, len: usize) -> (Vec, Vec) { let mut values_indexes = Vec::with_capacity(run_boundaries.len() + 1); + // Values array always pulls the first element of each run; index 0 is by definition a run start. values_indexes.push(0); values_indexes.extend_from_slice(&run_boundaries); run_boundaries.push(len); @@ -579,6 +589,7 @@ fn scan_run_end( start: usize, ) -> usize { let element_size = std::mem::size_of::(); + // Only attempt the chunked search when the element size divides evenly into 16 bytes. if element_size <= 8 && 16 % element_size == 0 { let elements_per_chunk = 16 / element_size; return scan_run_end_chunk::(values, current, start, elements_per_chunk, element_size); @@ -601,6 +612,9 @@ fn scan_run_end_chunk( } let mut pattern_bytes = [0u8; 16]; + // Safety: `T::Native` is guaranteed by `ArrowPrimitiveType` to have a plain-old-data layout, + // allowing the value to be viewed as raw bytes. We copy exactly `element_size` bytes, so the + // slice built from `current` stays within bounds. unsafe { let value_bytes = std::slice::from_raw_parts(¤t as *const T::Native as *const u8, element_size); @@ -611,6 +625,7 @@ fn scan_run_end_chunk( let pattern = u128::from_ne_bytes(pattern_bytes); while idx + elements_per_chunk <= len { + // SAFETY: pointer arithmetic stays within the backing slice; unaligned reads are allowed. let chunk = unsafe { (values.as_ptr().add(idx) as *const u128).read_unaligned() }; if chunk != pattern { for offset in 0..elements_per_chunk { @@ -619,7 +634,6 @@ fn scan_run_end_chunk( return idx + offset; } } - unreachable!("chunk mismatch without locating differing element"); } idx += elements_per_chunk; }