Skip to content

Commit 068f96f

Browse files
Hash UnionArrays (#18718)
## Which issue does this PR close? - Closes #18717 ## Rationale for this change This PR adds hash support for Union data types, enabling group by, distinct, hash joins, and aggregations on union-typed columns `hash_union_array` hashes each child array once. Then for every row, it uses the type id and offset to retrieve the appropriate hash value
1 parent 3b390d7 commit 068f96f

File tree

1 file changed

+154
-1
lines changed

1 file changed

+154
-1
lines changed

datafusion/common/src/hash_utils.rs

Lines changed: 154 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use arrow::{downcast_dictionary_array, downcast_primitive_array};
2828
use crate::cast::{
2929
as_binary_view_array, as_boolean_array, as_fixed_size_list_array,
3030
as_generic_binary_array, as_large_list_array, as_list_array, as_map_array,
31-
as_string_array, as_string_view_array, as_struct_array,
31+
as_string_array, as_string_view_array, as_struct_array, as_union_array,
3232
};
3333
use crate::error::Result;
3434
use crate::error::{_internal_datafusion_err, _internal_err};
@@ -417,6 +417,40 @@ where
417417
Ok(())
418418
}
419419

420+
#[cfg(not(feature = "force_hash_collisions"))]
421+
fn hash_union_array(
422+
array: &UnionArray,
423+
random_state: &RandomState,
424+
hashes_buffer: &mut [u64],
425+
) -> Result<()> {
426+
use std::collections::HashMap;
427+
428+
let DataType::Union(union_fields, _mode) = array.data_type() else {
429+
unreachable!()
430+
};
431+
432+
let mut child_hashes = HashMap::with_capacity(union_fields.len());
433+
434+
for (type_id, _field) in union_fields.iter() {
435+
let child = array.child(type_id);
436+
let mut child_hash_buffer = vec![0; child.len()];
437+
create_hashes([child], random_state, &mut child_hash_buffer)?;
438+
439+
child_hashes.insert(type_id, child_hash_buffer);
440+
}
441+
442+
#[expect(clippy::needless_range_loop)]
443+
for i in 0..array.len() {
444+
let type_id = array.type_id(i);
445+
let child_offset = array.value_offset(i);
446+
447+
let child_hash = child_hashes.get(&type_id).expect("invalid type_id");
448+
hashes_buffer[i] = combine_hashes(hashes_buffer[i], child_hash[child_offset]);
449+
}
450+
451+
Ok(())
452+
}
453+
420454
#[cfg(not(feature = "force_hash_collisions"))]
421455
fn hash_fixed_list_array(
422456
array: &FixedSizeListArray,
@@ -497,6 +531,10 @@ fn hash_single_array(
497531
let array = as_fixed_size_list_array(array)?;
498532
hash_fixed_list_array(array, random_state, hashes_buffer)?;
499533
}
534+
DataType::Union(_, _) => {
535+
let array = as_union_array(array)?;
536+
hash_union_array(array, random_state, hashes_buffer)?;
537+
}
500538
_ => {
501539
// This is internal because we should have caught this before.
502540
return _internal_err!(
@@ -1168,4 +1206,119 @@ mod tests {
11681206
"Error message should mention reentrancy: {err_msg}",
11691207
);
11701208
}
1209+
1210+
#[test]
1211+
#[cfg(not(feature = "force_hash_collisions"))]
1212+
fn create_hashes_for_sparse_union_arrays() {
1213+
// logical array: [int(5), str("foo"), int(10), int(5)]
1214+
let int_array = Int32Array::from(vec![Some(5), None, Some(10), Some(5)]);
1215+
let str_array = StringArray::from(vec![None, Some("foo"), None, None]);
1216+
1217+
let type_ids = vec![0_i8, 1, 0, 0].into();
1218+
let children = vec![
1219+
Arc::new(int_array) as ArrayRef,
1220+
Arc::new(str_array) as ArrayRef,
1221+
];
1222+
1223+
let union_fields = [
1224+
(0, Arc::new(Field::new("a", DataType::Int32, true))),
1225+
(1, Arc::new(Field::new("b", DataType::Utf8, true))),
1226+
]
1227+
.into_iter()
1228+
.collect();
1229+
1230+
let array = UnionArray::try_new(union_fields, type_ids, None, children).unwrap();
1231+
let array_ref = Arc::new(array) as ArrayRef;
1232+
1233+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1234+
let mut hashes = vec![0; array_ref.len()];
1235+
create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
1236+
1237+
// Rows 0 and 3 both have type_id=0 (int) with value 5
1238+
assert_eq!(hashes[0], hashes[3]);
1239+
// Row 0 (int 5) vs Row 2 (int 10) - different values
1240+
assert_ne!(hashes[0], hashes[2]);
1241+
// Row 0 (int) vs Row 1 (string) - different types
1242+
assert_ne!(hashes[0], hashes[1]);
1243+
}
1244+
1245+
#[test]
1246+
#[cfg(not(feature = "force_hash_collisions"))]
1247+
fn create_hashes_for_sparse_union_arrays_with_nulls() {
1248+
// logical array: [int(5), str("foo"), int(null), str(null)]
1249+
let int_array = Int32Array::from(vec![Some(5), None, None, None]);
1250+
let str_array = StringArray::from(vec![None, Some("foo"), None, None]);
1251+
1252+
let type_ids = vec![0, 1, 0, 1].into();
1253+
let children = vec![
1254+
Arc::new(int_array) as ArrayRef,
1255+
Arc::new(str_array) as ArrayRef,
1256+
];
1257+
1258+
let union_fields = [
1259+
(0, Arc::new(Field::new("a", DataType::Int32, true))),
1260+
(1, Arc::new(Field::new("b", DataType::Utf8, true))),
1261+
]
1262+
.into_iter()
1263+
.collect();
1264+
1265+
let array = UnionArray::try_new(union_fields, type_ids, None, children).unwrap();
1266+
let array_ref = Arc::new(array) as ArrayRef;
1267+
1268+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1269+
let mut hashes = vec![0; array_ref.len()];
1270+
create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
1271+
1272+
// row 2 (int null) and row 3 (str null) should have the same hash
1273+
// because they are both null values
1274+
assert_eq!(hashes[2], hashes[3]);
1275+
1276+
// row 0 (int 5) vs row 2 (int null) - different (value vs null)
1277+
assert_ne!(hashes[0], hashes[2]);
1278+
1279+
// row 1 (str "foo") vs row 3 (str null) - different (value vs null)
1280+
assert_ne!(hashes[1], hashes[3]);
1281+
}
1282+
1283+
#[test]
1284+
#[cfg(not(feature = "force_hash_collisions"))]
1285+
fn create_hashes_for_dense_union_arrays() {
1286+
// creates a dense union array with int and string types
1287+
// [67, "norm", 100, "macdonald", 67]
1288+
let int_array = Int32Array::from(vec![67, 100, 67]);
1289+
let str_array = StringArray::from(vec!["norm", "macdonald"]);
1290+
1291+
let type_ids = vec![0, 1, 0, 1, 0].into();
1292+
let offsets = vec![0, 0, 1, 1, 2].into();
1293+
let children = vec![
1294+
Arc::new(int_array) as ArrayRef,
1295+
Arc::new(str_array) as ArrayRef,
1296+
];
1297+
1298+
let union_fields = [
1299+
(0, Arc::new(Field::new("a", DataType::Int32, false))),
1300+
(1, Arc::new(Field::new("b", DataType::Utf8, false))),
1301+
]
1302+
.into_iter()
1303+
.collect();
1304+
1305+
let array =
1306+
UnionArray::try_new(union_fields, type_ids, Some(offsets), children).unwrap();
1307+
let array_ref = Arc::new(array) as ArrayRef;
1308+
1309+
let random_state = RandomState::with_seeds(0, 0, 0, 0);
1310+
let mut hashes = vec![0; array_ref.len()];
1311+
create_hashes(&[array_ref], &random_state, &mut hashes).unwrap();
1312+
1313+
// 67 vs "norm"
1314+
assert_ne!(hashes[0], hashes[1]);
1315+
// 67 vs 100
1316+
assert_ne!(hashes[0], hashes[2]);
1317+
// "norm" vs "macdonald"
1318+
assert_ne!(hashes[1], hashes[3]);
1319+
// 100 vs "macdonald"
1320+
assert_ne!(hashes[2], hashes[3]);
1321+
// 67 vs 67
1322+
assert_eq!(hashes[0], hashes[4]);
1323+
}
11711324
}

0 commit comments

Comments
 (0)