Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 71 additions & 102 deletions datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ use std::{any::Any, fmt::Display, hash::Hash, sync::Arc};

use ahash::RandomState;
use arrow::{
array::{BooleanArray, UInt64Array},
buffer::MutableBuffer,
array::{ArrayRef, UInt64Array},
datatypes::{DataType, Schema},
util::bit_util,
record_batch::RecordBatch,
};
use datafusion_common::{Result, internal_datafusion_err, internal_err};
use datafusion_common::Result;
use datafusion_common::hash_utils::{create_hashes, with_hashes};
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr_common::physical_expr::{
DynHash, PhysicalExpr, PhysicalExprRef,
};

use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType};
use crate::joins::utils::JoinHashMapType;

/// RandomState wrapper that preserves the seeds used to create it.
///
Expand Down Expand Up @@ -181,18 +181,11 @@ impl PhysicalExpr for HashExpr {
Ok(false)
}

fn evaluate(
&self,
batch: &arrow::record_batch::RecordBatch,
) -> Result<ColumnarValue> {
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
let num_rows = batch.num_rows();

// Evaluate columns
let keys_values = self
.on_columns
.iter()
.map(|c| c.evaluate(batch)?.into_array(num_rows))
.collect::<Result<Vec<_>>>()?;
let keys_values = evaluate_columns(&self.on_columns, batch)?;

// Compute hashes
let mut hashes_buffer = vec![0; num_rows];
Expand All @@ -217,8 +210,10 @@ impl PhysicalExpr for HashExpr {
/// Takes a UInt64Array of hash values and checks membership in a hash table.
/// Returns a BooleanArray indicating which hashes exist.
pub struct HashTableLookupExpr {
/// Expression that computes hash values (should be a HashExpr)
hash_expr: PhysicalExprRef,
/// Columns to hash
on_columns: Vec<PhysicalExprRef>,
/// Random state for hashing (with seeds preserved for serialization)
random_state: SeededRandomState,
/// Hash table to check against
hash_map: Arc<dyn JoinHashMapType>,
/// Description for display
Expand All @@ -229,20 +224,23 @@ impl HashTableLookupExpr {
/// Create a new HashTableLookupExpr
///
/// # Arguments
/// * `hash_expr` - Expression that computes hash values
/// * `on_columns` - Columns to hash
/// * `random_state` - SeededRandomState for hashing
/// * `hash_map` - Hash table to check membership
/// * `description` - Description for debugging
///
/// # Note
/// This is public for internal testing purposes only and is not
/// guaranteed to be stable across versions.
pub fn new(
hash_expr: PhysicalExprRef,
on_columns: Vec<PhysicalExprRef>,
random_state: SeededRandomState,
hash_map: Arc<dyn JoinHashMapType>,
description: String,
) -> Self {
Self {
hash_expr,
on_columns,
random_state,
hash_map,
description,
}
Expand All @@ -251,14 +249,22 @@ impl HashTableLookupExpr {

impl std::fmt::Debug for HashTableLookupExpr {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}({:?})", self.description, self.hash_expr)
let cols = self
.on_columns
.iter()
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join(", ");
let (s1, s2, s3, s4) = self.random_state.seeds();
write!(f, "{}({cols}, [{s1},{s2},{s3},{s4}])", self.description)
}
}

impl Hash for HashTableLookupExpr {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.hash_expr.dyn_hash(state);
self.on_columns.dyn_hash(state);
self.description.hash(state);
self.random_state.seeds().hash(state);
// Note that we compare hash_map by pointer equality.
// Actually comparing the contents of the hash maps would be expensive.
// The way these hash maps are used in actuality is that HashJoinExec creates
Expand All @@ -279,8 +285,9 @@ impl PartialEq for HashTableLookupExpr {
// hash maps to have the same content in practice.
// Theoretically this is a public API and users could create identical hash maps,
// but that seems unlikely and not worth paying the cost of deep comparison all the time.
self.hash_expr.as_ref() == other.hash_expr.as_ref()
self.on_columns == other.on_columns
&& self.description == other.description
&& self.random_state.seeds() == other.random_state.seeds()
&& Arc::ptr_eq(&self.hash_map, &other.hash_map)
}
}
Expand All @@ -299,21 +306,16 @@ impl PhysicalExpr for HashTableLookupExpr {
}

fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
vec![&self.hash_expr]
self.on_columns.iter().collect()
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn PhysicalExpr>>,
) -> Result<Arc<dyn PhysicalExpr>> {
if children.len() != 1 {
return internal_err!(
"HashTableLookupExpr expects exactly 1 child, got {}",
children.len()
);
}
Ok(Arc::new(HashTableLookupExpr::new(
Arc::clone(&children[0]),
children,
self.random_state.clone(),
Arc::clone(&self.hash_map),
self.description.clone(),
)))
Expand All @@ -327,43 +329,32 @@ impl PhysicalExpr for HashTableLookupExpr {
Ok(false)
}

fn evaluate(
&self,
batch: &arrow::record_batch::RecordBatch,
) -> Result<ColumnarValue> {
let num_rows = batch.num_rows();

// Evaluate hash expression to get hash values
let hash_array = self.hash_expr.evaluate(batch)?.into_array(num_rows)?;
let hash_array = hash_array.as_any().downcast_ref::<UInt64Array>().ok_or(
internal_datafusion_err!(
"HashTableLookupExpr expects UInt64Array from hash expression"
),
)?;

// Check each hash against the hash table
let mut buf = MutableBuffer::from_len_zeroed(bit_util::ceil(num_rows, 8));
for (idx, hash_value) in hash_array.values().iter().enumerate() {
// Use get_matched_indices to check - if it returns any indices, the hash exists
let (matched_indices, _) = self
.hash_map
.get_matched_indices(Box::new(std::iter::once((idx, hash_value))), None);

if !matched_indices.is_empty() {
bit_util::set_bit(buf.as_slice_mut(), idx);
}
}
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
// Evaluate columns
let keys_values = evaluate_columns(&self.on_columns, batch)?;

Ok(ColumnarValue::Array(Arc::new(
BooleanArray::new_from_packed(buf, 0, num_rows),
)))
with_hashes(&keys_values, self.random_state.random_state(), |hashes| {
let array = self.hash_map.contain_hashes(hashes);
Ok(ColumnarValue::Array(Arc::new(array)))
})
}

fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.description)
}
}

fn evaluate_columns(
columns: &[PhysicalExprRef],
batch: &RecordBatch,
) -> Result<Vec<ArrayRef>> {
let num_rows = batch.num_rows();
columns
.iter()
.map(|c| c.evaluate(batch)?.into_array(num_rows))
.collect()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -482,22 +473,19 @@ mod tests {
#[test]
fn test_hash_table_lookup_expr_eq_same() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"inner_hash".to_string(),
));
let hash_map: Arc<dyn JoinHashMapType> =
Arc::new(JoinHashMapU32::with_capacity(10));

let expr1 = HashTableLookupExpr::new(
Arc::clone(&hash_expr),
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup".to_string(),
);

let expr2 = HashTableLookupExpr::new(
Arc::clone(&hash_expr),
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup".to_string(),
);
Expand All @@ -506,33 +494,23 @@ mod tests {
}

#[test]
fn test_hash_table_lookup_expr_eq_different_hash_expr() {
fn test_hash_table_lookup_expr_eq_different_columns() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1));

let hash_expr1: PhysicalExprRef = Arc::new(HashExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"inner_hash".to_string(),
));

let hash_expr2: PhysicalExprRef = Arc::new(HashExpr::new(
vec![Arc::clone(&col_b)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"inner_hash".to_string(),
));

let hash_map: Arc<dyn JoinHashMapType> =
Arc::new(JoinHashMapU32::with_capacity(10));

let expr1 = HashTableLookupExpr::new(
Arc::clone(&hash_expr1),
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup".to_string(),
);

let expr2 = HashTableLookupExpr::new(
Arc::clone(&hash_expr2),
vec![Arc::clone(&col_b)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup".to_string(),
);
Expand All @@ -543,22 +521,19 @@ mod tests {
#[test]
fn test_hash_table_lookup_expr_eq_different_description() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"inner_hash".to_string(),
));
let hash_map: Arc<dyn JoinHashMapType> =
Arc::new(JoinHashMapU32::with_capacity(10));

let expr1 = HashTableLookupExpr::new(
Arc::clone(&hash_expr),
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup_one".to_string(),
);

let expr2 = HashTableLookupExpr::new(
Arc::clone(&hash_expr),
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup_two".to_string(),
);
Expand All @@ -569,11 +544,6 @@ mod tests {
#[test]
fn test_hash_table_lookup_expr_eq_different_hash_map() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"inner_hash".to_string(),
));

// Two different Arc pointers (even with same content) should not be equal
let hash_map1: Arc<dyn JoinHashMapType> =
Expand All @@ -582,13 +552,15 @@ mod tests {
Arc::new(JoinHashMapU32::with_capacity(10));

let expr1 = HashTableLookupExpr::new(
Arc::clone(&hash_expr),
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
hash_map1,
"lookup".to_string(),
);

let expr2 = HashTableLookupExpr::new(
Arc::clone(&hash_expr),
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
hash_map2,
"lookup".to_string(),
);
Expand All @@ -600,22 +572,19 @@ mod tests {
#[test]
fn test_hash_table_lookup_expr_hash_consistency() {
let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0));
let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new(
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
"inner_hash".to_string(),
));
let hash_map: Arc<dyn JoinHashMapType> =
Arc::new(JoinHashMapU32::with_capacity(10));

let expr1 = HashTableLookupExpr::new(
Arc::clone(&hash_expr),
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup".to_string(),
);

let expr2 = HashTableLookupExpr::new(
Arc::clone(&hash_expr),
vec![Arc::clone(&col_a)],
SeededRandomState::with_seeds(1, 2, 3, 4),
Arc::clone(&hash_map),
"lookup".to_string(),
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,9 @@ fn create_membership_predicate(
}
// Use hash table lookup for large build sides
PushdownStrategy::HashTable(hash_map) => {
let lookup_hash_expr = Arc::new(HashExpr::new(
Ok(Some(Arc::new(HashTableLookupExpr::new(
on_right.to_vec(),
random_state.clone(),
"hash_join".to_string(),
)) as Arc<dyn PhysicalExpr>;

Ok(Some(Arc::new(HashTableLookupExpr::new(
lookup_hash_expr,
hash_map,
"hash_lookup".to_string(),
)) as Arc<dyn PhysicalExpr>))
Expand Down
Loading