From 35018b6c894464212b9e7afefb42ccacbb86792f Mon Sep 17 00:00:00 2001 From: lvlongxiang Date: Sat, 27 Dec 2025 10:08:15 +0000 Subject: [PATCH] feat: support perfect hash join --- Cargo.lock | 1 + benchmarks/bench.sh | 6 +- benchmarks/compare.py | 20 +- benchmarks/src/hj.rs | 410 +++++--- datafusion/common/src/config.rs | 19 + datafusion/physical-plan/Cargo.toml | 1 + .../physical-plan/src/joins/array_map.rs | 615 ++++++++++++ datafusion/physical-plan/src/joins/chain.rs | 69 ++ .../physical-plan/src/joins/hash_join/exec.rs | 938 ++++++++++++++---- .../joins/hash_join/partitioned_hash_eval.rs | 145 +-- .../src/joins/hash_join/shared_bounds.rs | 10 +- .../src/joins/hash_join/stream.rs | 66 +- .../physical-plan/src/joins/join_hash_map.rs | 66 +- datafusion/physical-plan/src/joins/mod.rs | 27 + .../src/joins/stream_join_utils.rs | 8 +- .../tests/cases/roundtrip_physical_plan.rs | 14 +- .../test_files/information_schema.slt | 4 + docs/source/user-guide/configs.md | 2 + 18 files changed, 1936 insertions(+), 485 deletions(-) create mode 100644 datafusion/physical-plan/src/joins/array_map.rs create mode 100644 datafusion/physical-plan/src/joins/chain.rs diff --git a/Cargo.lock b/Cargo.lock index 1675f26e8a0f0..aef29c24f6e9b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2584,6 +2584,7 @@ dependencies = [ "insta", "itertools 0.14.0", "log", + "num-traits", "parking_lot", "pin-project-lite", "rand 0.9.2", diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index d5fa52d7f00ee..e7f643a5d51d5 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -322,8 +322,7 @@ main() { echo "NLJ benchmark does not require data generation" ;; hj) - # hj uses range() function, no data generation needed - echo "HJ benchmark does not require data generation" + data_tpch "10" "parquet" ;; smj) # smj uses range() function, no data generation needed @@ -1228,10 +1227,11 @@ run_nlj() { # Runs the hj benchmark run_hj() { + TPCH_DIR="${DATA_DIR}/tpch_sf10" RESULTS_FILE="${RESULTS_DIR}/hj.json" echo "RESULTS_FILE: ${RESULTS_FILE}" echo "Running hj benchmark..." - debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 -o "${RESULTS_FILE}" ${QUERY_ARG} + debug_run $CARGO_COMMAND --bin dfbench -- hj --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" ${QUERY_ARG} } # Runs the smj benchmark diff --git a/benchmarks/compare.py b/benchmarks/compare.py index 7e51a38a92c2b..9ad1de980abe8 100755 --- a/benchmarks/compare.py +++ b/benchmarks/compare.py @@ -154,17 +154,17 @@ def compare( baseline = BenchmarkRun.load_from_file(baseline_path) comparison = BenchmarkRun.load_from_file(comparison_path) - console = Console() + console = Console(width=200) # use basename as the column names - baseline_header = baseline_path.parent.stem - comparison_header = comparison_path.parent.stem + baseline_header = baseline_path.parent.name + comparison_header = comparison_path.parent.name table = Table(show_header=True, header_style="bold magenta") - table.add_column("Query", style="dim", width=12) - table.add_column(baseline_header, justify="right", style="dim") - table.add_column(comparison_header, justify="right", style="dim") - table.add_column("Change", justify="right", style="dim") + table.add_column("Query", style="dim", no_wrap=True) + table.add_column(baseline_header, justify="right", style="dim", no_wrap=True) + table.add_column(comparison_header, justify="right", style="dim", no_wrap=True) + table.add_column("Change", justify="right", style="dim", no_wrap=True) faster_count = 0 slower_count = 0 @@ -175,12 +175,12 @@ def compare( for baseline_result, comparison_result in zip(baseline.queries, comparison.queries): assert baseline_result.query == comparison_result.query - + base_failed = not baseline_result.success - comp_failed = not comparison_result.success + comp_failed = not comparison_result.success # If a query fails, its execution time is excluded from the performance comparison if base_failed or comp_failed: - change_text = "incomparable" + change_text = "incomparable" failure_count += 1 table.add_row( f"Q{baseline_result.query}", diff --git a/benchmarks/src/hj.rs b/benchmarks/src/hj.rs index 562047f615bc8..9dbfac158914a 100644 --- a/benchmarks/src/hj.rs +++ b/benchmarks/src/hj.rs @@ -20,6 +20,7 @@ use datafusion::physical_plan::execute_stream; use datafusion::{error::Result, prelude::SessionContext}; use datafusion_common::instant::Instant; use datafusion_common::{DataFusionError, exec_datafusion_err, exec_err}; +use std::path::PathBuf; use structopt::StructOpt; use futures::StreamExt; @@ -35,7 +36,7 @@ use futures::StreamExt; #[derive(Debug, StructOpt, Clone)] #[structopt(verbatim_doc_comment)] pub struct RunOpt { - /// Query number (between 1 and 12). If not specified, runs all queries + /// Query number. If not specified, runs all queries #[structopt(short, long)] query: Option, @@ -43,128 +44,265 @@ pub struct RunOpt { #[structopt(flatten)] common: CommonOpt, + /// Path to TPC-H SF10 data + #[structopt(parse(from_os_str), short = "p", long = "path")] + path: Option, + /// If present, write results json here #[structopt(parse(from_os_str), short = "o", long = "output")] - output_path: Option, + output_path: Option, +} + +struct HashJoinQuery { + sql: &'static str, + density: f64, + prob_hit: f64, + build_size: &'static str, + probe_size: &'static str, } /// Inline SQL queries for Hash Join benchmarks -/// -/// Each query's comment includes: -/// - Left row count × Right row count -/// - Join predicate selectivity (approximate output fraction). -/// - Q11 and Q12 selectivity is relative to cartesian product while the others are -/// relative to probe side. -const HASH_QUERIES: &[&str] = &[ - // Q1: INNER 10 x 10K | LOW ~0.1% - // equality on key + cheap filter to downselect - r#" - SELECT t1.value, t2.value - FROM generate_series(0, 9000, 1000) AS t1(value) - JOIN range(10000) AS t2 - ON t1.value = t2.value; - "#, - // Q2: INNER 10 x 10K | LOW ~0.1% - r#" - SELECT t1.value, t2.value - FROM generate_series(0, 9000, 1000) AS t1 - JOIN range(10000) AS t2 - ON t1.value = t2.value - WHERE t1.value % 5 = 0 - "#, - // Q3: INNER 10K x 10K | HIGH ~90% - r#" - SELECT t1.value, t2.value - FROM range(10000) AS t1 - JOIN range(10000) AS t2 - ON t1.value = t2.value - WHERE t1.value % 10 <> 0 - "#, - // Q4: INNER 30 x 30K | LOW ~0.1% - r#" - SELECT t1.value, t2.value - FROM generate_series(0, 29000, 1000) AS t1 - JOIN range(30000) AS t2 - ON t1.value = t2.value - WHERE t1.value % 5 = 0 - "#, - // Q5: INNER 10 x 200K | VERY LOW ~0.005% (small to large) - r#" - SELECT t1.value, t2.value - FROM generate_series(0, 9000, 1000) AS t1 - JOIN range(200000) AS t2 - ON t1.value = t2.value - WHERE t1.value % 1000 = 0 - "#, - // Q6: INNER 200K x 10 | VERY LOW ~0.005% (large to small) - r#" - SELECT t1.value, t2.value - FROM range(200000) AS t1 - JOIN generate_series(0, 9000, 1000) AS t2 - ON t1.value = t2.value - WHERE t1.value % 1000 = 0 - "#, - // Q7: RIGHT OUTER 10 x 200K | LOW ~0.1% - // Outer join still uses HashJoin for equi-keys; the extra filter reduces matches - r#" - SELECT t1.value AS l, t2.value AS r - FROM generate_series(0, 9000, 1000) AS t1 - RIGHT JOIN range(200000) AS t2 - ON t1.value = t2.value - WHERE t2.value % 1000 = 0 - "#, - // Q8: LEFT OUTER 200K x 10 | LOW ~0.1% - r#" - SELECT t1.value AS l, t2.value AS r - FROM range(200000) AS t1 - LEFT JOIN generate_series(0, 9000, 1000) AS t2 - ON t1.value = t2.value - WHERE t1.value % 1000 = 0 - "#, - // Q9: FULL OUTER 30 x 30K | LOW ~0.1% - r#" - SELECT t1.value AS l, t2.value AS r - FROM generate_series(0, 29000, 1000) AS t1 - FULL JOIN range(30000) AS t2 - ON t1.value = t2.value - WHERE COALESCE(t1.value, t2.value) % 1000 = 0 - "#, - // Q10: FULL OUTER 30 x 30K | HIGH ~90% - r#" - SELECT t1.value AS l, t2.value AS r - FROM generate_series(0, 29000, 1000) AS t1 - FULL JOIN range(30000) AS t2 - ON t1.value = t2.value - WHERE COALESCE(t1.value, t2.value) % 10 <> 0 - "#, - // Q11: INNER 30 x 30K | MEDIUM ~50% | cheap predicate on parity - r#" - SELECT t1.value, t2.value - FROM generate_series(0, 29000, 1000) AS t1 - INNER JOIN range(30000) AS t2 - ON (t1.value % 2) = (t2.value % 2) - "#, - // Q12: FULL OUTER 30 x 30K | MEDIUM ~50% | expression key - r#" - SELECT t1.value AS l, t2.value AS r - FROM generate_series(0, 29000, 1000) AS t1 - FULL JOIN range(30000) AS t2 - ON (t1.value % 2) = (t2.value % 2) - "#, - // Q13: INNER 30 x 30K | LOW 0.1% | modulo with adding values - r#" - SELECT t1.value, t2.value - FROM generate_series(0, 29000, 1000) AS t1 - INNER JOIN range(30000) AS t2 - ON (t1.value = t2.value) AND ((t1.value + t2.value) % 10 < 1) - "#, - // Q14: FULL OUTER 30 x 30K | ALL ~100% | modulo - r#" - SELECT t1.value AS l, t2.value AS r - FROM generate_series(0, 29000, 1000) AS t1 - FULL JOIN range(30000) AS t2 - ON (t1.value = t2.value) AND ((t1.value + t2.value) % 10 = 0) - "#, +const HASH_QUERIES: &[HashJoinQuery] = &[ + // Q1: Very Small Build Side (Dense) + // Build Side: nation (25 rows) | Probe Side: customer (1.5M rows) + HashJoinQuery { + sql: r###"SELECT n_nationkey FROM nation JOIN customer ON c_nationkey = n_nationkey"###, + density: 1.0, + prob_hit: 1.0, + build_size: "25", + probe_size: "1.5M", + }, + // Q2: Very Small Build Side (Sparse, range < 1024) + // Build Side: nation (25 rows, range 961) | Probe Side: customer (1.5M rows) + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT c_nationkey * 40 as k + FROM customer + ) l + JOIN ( + SELECT n_nationkey * 40 as k FROM nation + ) s ON l.k = s.k"###, + density: 0.026, + prob_hit: 1.0, + build_size: "25", + probe_size: "1.5M", + }, + // Q3: 100% Density, 100% Hit rate + HashJoinQuery { + sql: r###"SELECT s_suppkey FROM supplier JOIN lineitem ON s_suppkey = l_suppkey"###, + density: 1.0, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M", + }, + // Q4: 100% Density, 10% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CASE WHEN l_suppkey % 10 = 0 THEN l_suppkey ELSE l_suppkey + 1000000 END as k + FROM lineitem + ) l + JOIN ( + SELECT s_suppkey as k FROM supplier + ) s ON l.k = s.k"###, + density: 1.0, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M", + }, + // Q5: 75% Density, 100% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT l_suppkey * 4 / 3 as k + FROM lineitem + ) l + JOIN ( + SELECT s_suppkey * 4 / 3 as k FROM supplier + ) s ON l.k = s.k"###, + density: 0.75, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M", + }, + // Q6: 75% Density, 10% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CASE + WHEN l_suppkey % 10 = 0 THEN l_suppkey * 4 / 3 + WHEN l_suppkey % 10 < 9 THEN (l_suppkey * 4 / 3 / 4) * 4 + 3 + ELSE l_suppkey * 4 / 3 + 1000000 + END as k + FROM lineitem + ) l + JOIN ( + SELECT s_suppkey * 4 / 3 as k FROM supplier + ) s ON l.k = s.k"###, + density: 0.75, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M", + }, + // Q7: 50% Density, 100% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT l_suppkey * 2 as k + FROM lineitem + ) l + JOIN ( + SELECT s_suppkey * 2 as k FROM supplier + ) s ON l.k = s.k"###, + density: 0.5, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M", + }, + // Q8: 50% Density, 10% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CASE + WHEN l_suppkey % 10 = 0 THEN l_suppkey * 2 + WHEN l_suppkey % 10 < 9 THEN l_suppkey * 2 + 1 + ELSE l_suppkey * 2 + 1000000 + END as k + FROM lineitem + ) l + JOIN ( + SELECT s_suppkey * 2 as k FROM supplier + ) s ON l.k = s.k"###, + density: 0.5, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M", + }, + // Q9: 20% Density, 100% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT l_suppkey * 5 as k + FROM lineitem + ) l + JOIN ( + SELECT s_suppkey * 5 as k FROM supplier + ) s ON l.k = s.k"###, + density: 0.2, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M", + }, + // Q10: 20% Density, 10% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CASE + WHEN l_suppkey % 10 = 0 THEN l_suppkey * 5 + WHEN l_suppkey % 10 < 9 THEN l_suppkey * 5 + 1 + ELSE l_suppkey * 5 + 1000000 + END as k + FROM lineitem + ) l + JOIN ( + SELECT s_suppkey * 5 as k FROM supplier + ) s ON l.k = s.k"###, + density: 0.2, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M", + }, + // Q11: 10% Density, 100% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT l_suppkey * 10 as k + FROM lineitem + ) l + JOIN ( + SELECT s_suppkey * 10 as k FROM supplier + ) s ON l.k = s.k"###, + density: 0.1, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M", + }, + // Q12: 10% Density, 10% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CASE + WHEN l_suppkey % 10 = 0 THEN l_suppkey * 10 + WHEN l_suppkey % 10 < 9 THEN l_suppkey * 10 + 1 + ELSE l_suppkey * 10 + 1000000 + END as k + FROM lineitem + ) l + JOIN ( + SELECT s_suppkey * 10 as k FROM supplier + ) s ON l.k = s.k"###, + density: 0.1, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M", + }, + // Q13: 1% Density, 100% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT l_suppkey * 100 as k + FROM lineitem + ) l + JOIN ( + SELECT s_suppkey * 100 as k FROM supplier + ) s ON l.k = s.k"###, + density: 0.01, + prob_hit: 1.0, + build_size: "100K", + probe_size: "60M", + }, + // Q14: 1% Density, 10% Hit rate + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CASE + WHEN l_suppkey % 10 = 0 THEN l_suppkey * 100 + WHEN l_suppkey % 10 < 9 THEN l_suppkey * 100 + 1 + ELSE l_suppkey * 100 + 11000000 -- oob + END as k + FROM lineitem + ) l + JOIN ( + SELECT s_suppkey * 100 as k FROM supplier + ) s ON l.k = s.k"###, + density: 0.01, + prob_hit: 0.1, + build_size: "100K", + probe_size: "60M", + }, + // Q15: 20% Density, 10% Hit rate, 20% Duplicates in Build Side + HashJoinQuery { + sql: r###"SELECT l.k + FROM ( + SELECT CASE + WHEN l_suppkey % 10 = 0 THEN ((l_suppkey % 80000) + 1) * 25 / 4 + ELSE ((l_suppkey % 80000) + 1) * 25 / 4 + 1 + END as k + FROM lineitem + ) l + JOIN ( + SELECT CASE + WHEN s_suppkey <= 80000 THEN (s_suppkey * 25) / 4 + ELSE ((s_suppkey - 80000) * 25) / 4 + END as k + FROM supplier + ) s ON l.k = s.k"###, + density: 0.2, + prob_hit: 0.1, + build_size: "100K_(20%_dups)", + probe_size: "60M", + }, ]; impl RunOpt { @@ -189,14 +327,44 @@ impl RunOpt { let rt_builder = self.common.runtime_env_builder()?; let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?); + if let Some(path) = &self.path { + for table in &["lineitem", "supplier", "nation", "customer"] { + let table_path = path.join(table); + if !table_path.exists() { + return exec_err!( + "TPC-H table {} not found at {:?}", + table, + table_path + ); + } + ctx.register_parquet( + *table, + table_path.to_str().unwrap(), + Default::default(), + ) + .await?; + } + } + let mut benchmark_run = BenchmarkRun::new(); for query_id in query_range { let query_index = query_id - 1; - let sql = HASH_QUERIES[query_index]; + let query = &HASH_QUERIES[query_index]; + + let case_name = format!( + "Query {}_density={}_prob_hit={}_{}*{}", + query_id, + query.density, + query.prob_hit, + query.build_size, + query.probe_size + ); + benchmark_run.start_new_case(&case_name); - benchmark_run.start_new_case(&format!("Query {query_id}")); - let query_run = self.benchmark_query(sql, &query_id.to_string(), &ctx).await; + let query_run = self + .benchmark_query(query.sql, &query_id.to_string(), &ctx) + .await; match query_run { Ok(query_results) => { for iter in query_results { diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 2bea2ec5a4526..f2d682226030b 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -468,6 +468,25 @@ config_namespace! { /// metadata memory consumption pub batch_size: usize, default = 8192 + /// A perfect hash join (see `HashJoinExec` for more details) will be considered + /// if the range of keys (max - min) on the build side is < this threshold. + /// This provides a fast path for joins with very small key ranges, + /// bypassing the density check. + /// + /// Currently only supports cases where build_side.num_rows() < u32::MAX. + /// Support for build_side.num_rows() >= u32::MAX will be added in the future. + pub perfect_hash_join_small_build_threshold: usize, default = 1024 + + /// The minimum required density of join keys on the build side to consider a + /// perfect hash join (see `HashJoinExec` for more details). Density is calculated as: + /// `(number of rows) / (max_key - min_key + 1)`. + /// A perfect hash join may be used if the actual key density > this + /// value. + /// + /// Currently only supports cases where build_side.num_rows() < u32::MAX. + /// Support for build_side.num_rows() >= u32::MAX will be added in the future. + pub perfect_hash_join_min_key_density: f64, default = 0.99 + /// When set to true, record batches will be examined between each operator and /// small batches will be coalesced into larger batches. This is helpful when there /// are highly selective filters or joins that could produce tiny output batches. The diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 68e67fa018f08..13f91fd7d4ea2 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -67,6 +67,7 @@ hashbrown = { workspace = true } indexmap = { workspace = true } itertools = { workspace = true, features = ["use_std"] } log = { workspace = true } +num-traits = { workspace = true } parking_lot = { workspace = true } pin-project-lite = "^0.2.7" tokio = { workspace = true } diff --git a/datafusion/physical-plan/src/joins/array_map.rs b/datafusion/physical-plan/src/joins/array_map.rs new file mode 100644 index 0000000000000..3e20e0c8d0d0b --- /dev/null +++ b/datafusion/physical-plan/src/joins/array_map.rs @@ -0,0 +1,615 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::buffer::MutableBuffer; +use arrow_schema::DataType; +use num_traits::AsPrimitive; +use std::mem::size_of; + +use crate::joins::MapOffset; +use crate::joins::chain::traverse_chain; +use arrow::array::{Array, ArrayRef, AsArray}; +use arrow::datatypes::ArrowNumericType; +use datafusion_common::{Result, ScalarValue, internal_err}; + +/// A macro to downcast only supported integer types (up to 64-bit) and invoke a generic function. +/// +/// Usage: `downcast_supported_integer!(data_type => (Method, arg1, arg2, ...))` +/// +/// The `Method` must be an associated method of [`ArrayMap`] that is generic over +/// `` and allow `T::Native: AsPrimitive`. +macro_rules! downcast_supported_integer { + ($DATA_TYPE:expr => ($METHOD:ident $(, $ARGS:expr)*)) => { + match $DATA_TYPE { + arrow::datatypes::DataType::Int8 => ArrayMap::$METHOD::($($ARGS),*), + arrow::datatypes::DataType::Int16 => ArrayMap::$METHOD::($($ARGS),*), + arrow::datatypes::DataType::Int32 => ArrayMap::$METHOD::($($ARGS),*), + arrow::datatypes::DataType::Int64 => ArrayMap::$METHOD::($($ARGS),*), + arrow::datatypes::DataType::UInt8 => ArrayMap::$METHOD::($($ARGS),*), + arrow::datatypes::DataType::UInt16 => ArrayMap::$METHOD::($($ARGS),*), + arrow::datatypes::DataType::UInt32 => ArrayMap::$METHOD::($($ARGS),*), + arrow::datatypes::DataType::UInt64 => ArrayMap::$METHOD::($($ARGS),*), + _ => { + return internal_err!( + "Unsupported type for ArrayMap: {:?}", + $DATA_TYPE + ); + } + } + }; +} + +/// A dense map for single-column integer join keys within a limited range. +/// +/// Maps join keys to build-side indices using direct array indexing: +/// `data[val - min_val_in_build_side] -> val_idx_in_build_side + 1`. +/// +/// NULL values are ignored on both the build side and the probe side. +/// +/// # Handling Negative Numbers with `wrapping_sub` +/// +/// This implementation supports signed integer ranges (e.g., `[-5, 5]`) efficiently by +/// treating them as `u64` (Two's Complement) and relying on the bitwise properties of +/// wrapping arithmetic (`wrapping_sub`). +/// +/// In Two's Complement representation, `a_signed - b_signed` produces the same bit pattern +/// as `a_unsigned.wrapping_sub(b_unsigned)` (modulo 2^N). This allows us to perform +/// range calculations and zero-based index mapping uniformly for both signed and unsigned +/// types without branching. +/// +/// ## Examples +/// +/// Consider an `Int64` range `[-5, 5]`. +/// * `min_val (-5)` casts to `u64`: `...11111011` (`u64::MAX - 4`) +/// * `max_val (5)` casts to `u64`: `...00000101` (`5`) +/// +/// **1. Range Calculation** +/// +/// ```text +/// In modular arithmetic, this is equivalent to: +/// (5 - (2^64 - 5)) mod 2^64 +/// = (5 - 2^64 + 5) mod 2^64 +/// = (10 - 2^64) mod 2^64 +/// = 10 +/// +/// ``` +/// The resulting `range` (10) correctly represents the size of the interval `[-5, 5]`. +/// +/// **2. Index Lookup (in `get_matched_indices`)** +/// +/// For a probe value of `0` (which is stored as `0u64`): +/// ```text +/// In modular arithmetic, this is equivalent to: +/// (0 - (2^64 - 5)) mod 2^64 +/// = (-2^64 + 5) mod 2^64 +/// = 5 +/// ``` +/// This correctly maps `-5` to index `0`, `0` to index `5`, etc. +#[derive(Debug)] +pub struct ArrayMap { + // data[probSideVal-offset] -> valIdxInBuildSide + 1; 0 for absent + data: Vec, + // min val in buildSide + offset: u64, + // next[buildSideIdx] -> next matching valIdxInBuildSide + 1; 0 for end of chain. + // If next is empty, it means there are no duplicate keys (no conflicts). + // It uses the same chain-based conflict resolution as [`JoinHashMapType`]. + next: Vec, + num_of_distinct_key: usize, +} + +impl ArrayMap { + pub fn is_supported_type(data_type: &DataType) -> bool { + matches!( + data_type, + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + ) + } + + pub(crate) fn key_to_u64(v: &ScalarValue) -> Option { + match v { + ScalarValue::Int8(Some(v)) => Some(*v as u64), + ScalarValue::Int16(Some(v)) => Some(*v as u64), + ScalarValue::Int32(Some(v)) => Some(*v as u64), + ScalarValue::Int64(Some(v)) => Some(*v as u64), + ScalarValue::UInt8(Some(v)) => Some(*v as u64), + ScalarValue::UInt16(Some(v)) => Some(*v as u64), + ScalarValue::UInt32(Some(v)) => Some(*v as u64), + ScalarValue::UInt64(Some(v)) => Some(*v), + _ => None, + } + } + + /// Estimates the maximum memory usage for an `ArrayMap` with the given parameters. + /// + pub fn estimate_memory_size(min_val: u64, max_val: u64, num_rows: usize) -> usize { + let range = Self::calculate_range(min_val, max_val); + let size = (range + 1) as usize; + size * size_of::() + num_rows * size_of::() + } + + pub fn calculate_range(min_val: u64, max_val: u64) -> u64 { + max_val.wrapping_sub(min_val) + } + + /// Creates a new [`ArrayMap`] from the given array of join keys. + /// + /// Note: This function processes only the non-null values in the input `array`, + /// ignoring any rows where the key is `NULL`. + /// + pub(crate) fn try_new(array: &ArrayRef, min_val: u64, max_val: u64) -> Result { + let range = max_val.wrapping_sub(min_val); + let size = (range + 1) as usize; + + let mut data: Vec = vec![0; size]; + let mut next: Vec = vec![]; + let mut num_of_distinct_key = 0; + + downcast_supported_integer!( + array.data_type() => ( + fill_data, + array, + min_val, + &mut data, + &mut next, + &mut num_of_distinct_key + ) + )?; + + Ok(Self { + data, + offset: min_val, + next, + num_of_distinct_key, + }) + } + + fn fill_data( + array: &ArrayRef, + offset_val: u64, + data: &mut [u32], + next: &mut Vec, + num_of_distinct_key: &mut usize, + ) -> Result<()> + where + T::Native: AsPrimitive, + { + let arr = array.as_primitive::(); + // Iterate in reverse to maintain FIFO order when there are duplicate keys. + for (i, val) in arr.iter().enumerate().rev() { + if let Some(val) = val { + let key: u64 = val.as_(); + let idx = key.wrapping_sub(offset_val) as usize; + if idx >= data.len() { + return internal_err!("failed build Array idx >= data.len()"); + } + + if data[idx] != 0 { + if next.is_empty() { + *next = vec![0; array.len()] + } + next[i] = data[idx] + } else { + *num_of_distinct_key += 1; + } + data[idx] = (i) as u32 + 1; + } + } + Ok(()) + } + + pub fn num_of_distinct_key(&self) -> usize { + self.num_of_distinct_key + } + + /// Returns the memory usage of this [`ArrayMap`] in bytes. + pub fn size(&self) -> usize { + self.data.capacity() * size_of::() + self.next.capacity() * size_of::() + } + + pub fn get_matched_indices_with_limit_offset( + &self, + prob_side_keys: &[ArrayRef], + limit: usize, + current_offset: MapOffset, + probe_indices: &mut Vec, + build_indices: &mut Vec, + ) -> Result> { + if prob_side_keys.len() != 1 { + return internal_err!( + "ArrayMap expects 1 join key, but got {}", + prob_side_keys.len() + ); + } + let array = &prob_side_keys[0]; + + downcast_supported_integer!( + array.data_type() => ( + lookup_and_get_indices, + self, + array, + limit, + current_offset, + probe_indices, + build_indices + ) + ) + } + + fn lookup_and_get_indices( + &self, + array: &ArrayRef, + limit: usize, + current_offset: MapOffset, + probe_indices: &mut Vec, + build_indices: &mut Vec, + ) -> Result> + where + T::Native: Copy + AsPrimitive, + { + probe_indices.clear(); + build_indices.clear(); + + let arr = array.as_primitive::(); + + let have_null = arr.null_count() > 0; + + if self.next.is_empty() { + for prob_idx in current_offset.0..arr.len() { + if build_indices.len() == limit { + return Ok(Some((prob_idx, None))); + } + + // short circuit + if have_null && arr.is_null(prob_idx) { + continue; + } + // SAFETY: prob_idx is guaranteed to be within bounds by the loop range. + let prob_val: u64 = unsafe { arr.value_unchecked(prob_idx) }.as_(); + let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize; + + if idx_in_build_side >= self.data.len() + || self.data[idx_in_build_side] == 0 + { + continue; + } + build_indices.push((self.data[idx_in_build_side] - 1) as u64); + probe_indices.push(prob_idx as u32); + } + Ok(None) + } else { + let mut remaining_output = limit; + let to_skip = match current_offset { + // None `initial_next_idx` indicates that `initial_idx` processing hasn't been started + (idx, None) => idx, + // Zero `initial_next_idx` indicates that `initial_idx` has been processed during + // previous iteration, and it should be skipped + (idx, Some(0)) => idx + 1, + // Otherwise, process remaining `initial_idx` matches by traversing `next_chain`, + // to start with the next index + (idx, Some(next_idx)) => { + let is_last = idx == arr.len() - 1; + if let Some(next_offset) = traverse_chain( + &self.next, + idx, + next_idx as u32, + &mut remaining_output, + probe_indices, + build_indices, + is_last, + ) { + return Ok(Some(next_offset)); + } + idx + 1 + } + }; + + for prob_side_idx in to_skip..arr.len() { + if remaining_output == 0 { + return Ok(Some((prob_side_idx, None))); + } + + if arr.is_null(prob_side_idx) { + continue; + } + + let is_last = prob_side_idx == arr.len() - 1; + + // SAFETY: prob_idx is guaranteed to be within bounds by the loop range. + let prob_val: u64 = unsafe { arr.value_unchecked(prob_side_idx) }.as_(); + let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize; + if idx_in_build_side >= self.data.len() + || self.data[idx_in_build_side] == 0 + { + continue; + } + + let build_idx = self.data[idx_in_build_side]; + + if let Some(offset) = traverse_chain( + &self.next, + prob_side_idx, + build_idx, + &mut remaining_output, + probe_indices, + build_indices, + is_last, + ) { + return Ok(Some(offset)); + } + } + Ok(None) + } + } + + pub fn mark_existing_probes( + &self, + probe_side_keys: &[ArrayRef], + buf: &mut MutableBuffer, + ) -> Result<()> { + if probe_side_keys.len() != 1 { + return internal_err!( + "ArrayMap join expects 1 join key, but got {}", + probe_side_keys.len() + ); + } + let array = &probe_side_keys[0]; + + downcast_supported_integer!( + array.data_type() => ( + mark_existing_probes_helper, + self, + array, + buf + ) + ); + Ok(()) + } + + fn mark_existing_probes_helper( + &self, + array: &ArrayRef, + buf: &mut MutableBuffer, + ) where + T::Native: AsPrimitive, + { + let arr = array.as_primitive::(); + for (i, val) in arr.iter().enumerate() { + if let Some(val) = val { + let key: u64 = val.as_(); + let idx = key.wrapping_sub(self.offset) as usize; + if idx < self.data.len() && self.data[idx] != 0 { + arrow::util::bit_util::set_bit(buf.as_slice_mut(), i); + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::Int32Array; + use arrow::array::Int64Array; + use std::sync::Arc; + + #[test] + fn test_array_map_limit_offset_duplicate_elements() -> Result<()> { + // Key 5: idx 0, 3, 6 + // Key 10: idx 1, 4 + // Key 15: idx 2, 5 + // Key 7: idx 7 + // Key 8: idx 8 + let build_array: ArrayRef = + Arc::new(Int32Array::from(vec![5, 10, 15, 5, 10, 15, 5, 7, 8])); + let array_map = ArrayMap::try_new(&build_array, 5, 15)?; + + let probe_array: ArrayRef = Arc::new(Int32Array::from(vec![5, 10, 15, 7, 8, 9])); + let prob_side_keys = [Arc::clone(&probe_array)]; + + let mut prob_indices = Vec::new(); + let mut build_indices = Vec::new(); + let mut current_offset = (0, None); + let batch_size = 2; + + // First call: Should get 2 matches for probe key 5 + let result_offset = array_map.get_matched_indices_with_limit_offset( + &prob_side_keys, + batch_size, + current_offset, + &mut prob_indices, + &mut build_indices, + )?; + + assert_eq!(prob_indices.len(), 2); + assert_eq!(build_indices.len(), 2); + assert_eq!(prob_indices, vec![0, 0]); + assert_eq!(build_indices, vec![0, 3]); + + // Offset to resume at 7 (index 6) for probe 0 + assert_eq!(result_offset, Some((0, Some(7)))); + + // Second call: Should get the last match for probe key 5, then one match for key 10 + current_offset = result_offset.unwrap(); + + let result_offset = array_map.get_matched_indices_with_limit_offset( + &prob_side_keys, + batch_size, + current_offset, + &mut prob_indices, + &mut build_indices, + )?; + + assert_eq!(prob_indices.len(), 2); + assert_eq!(build_indices.len(), 2); + assert_eq!(prob_indices, vec![0, 1]); + assert_eq!(build_indices, vec![6, 1]); // Match 6 (probe 0), Match 1 (probe 1) + + // Offset to resume at 5 (index 4) for probe 1 + assert_eq!(result_offset, Some((1, Some(5)))); + + // Third call: Should get last match for key 10, then one match for key 15 + current_offset = result_offset.unwrap(); + + let result_offset = array_map.get_matched_indices_with_limit_offset( + &prob_side_keys, + batch_size, + current_offset, + &mut prob_indices, + &mut build_indices, + )?; + + assert_eq!(prob_indices.len(), 2); + assert_eq!(build_indices.len(), 2); + assert_eq!(prob_indices, vec![1, 2]); + assert_eq!(build_indices, vec![4, 2]); // Match 4 (probe 1), Match 2 (probe 2) + + // Offset to resume at 6 (index 5) for probe 2 + assert_eq!(result_offset, Some((2, Some(6)))); + + // Fourth call: Should get last match for key 15 + current_offset = result_offset.unwrap(); + + let result_offset = array_map.get_matched_indices_with_limit_offset( + &prob_side_keys, + batch_size, + current_offset, + &mut prob_indices, + &mut build_indices, + )?; + + assert_eq!(prob_indices.len(), 2); + assert_eq!(build_indices.len(), 2); + assert_eq!(prob_indices, vec![2, 3]); + assert_eq!(build_indices, vec![5, 7]); // Match 5 (probe 2) + assert_eq!(Some((3, Some(0))), result_offset); + + current_offset = result_offset.unwrap(); + let result_offset = array_map.get_matched_indices_with_limit_offset( + &prob_side_keys, + batch_size, + current_offset, + &mut prob_indices, + &mut build_indices, + )?; + + assert_eq!(prob_indices.len(), 1); + assert_eq!(build_indices.len(), 1); + assert_eq!(prob_indices, vec![4]); + assert_eq!(build_indices, vec![8]); + assert!(result_offset.is_none()); + + Ok(()) + } + + #[test] + fn test_array_map_with_limit_and_misses() -> Result<()> { + let build_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2])); + let array_map = ArrayMap::try_new(&build_array, 1, 2)?; + let probe_array: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 1, 2])); + let prob_side_keys = [probe_array]; + + let mut prob_indices = Vec::new(); + let mut build_indices = Vec::new(); + + // batch_size=2, first call should skip 3 misses and return 2 hits + let result_offset = array_map.get_matched_indices_with_limit_offset( + &prob_side_keys, + 2, + (0, None), + &mut prob_indices, + &mut build_indices, + )?; + + assert_eq!(prob_indices, vec![3, 4]); + assert_eq!(build_indices, vec![0, 1]); + assert!(result_offset.is_none()); + Ok(()) + } + + #[test] + fn test_array_map_with_build_duplicates_and_misses() -> Result<()> { + let build_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 1])); + let array_map = ArrayMap::try_new(&build_array, 1, 1)?; + // prob: 10(m), 1(h1, h2), 20(m), 1(h1, h2) + let probe_array: ArrayRef = Arc::new(Int32Array::from(vec![10, 1, 20, 1])); + let prob_side_keys = [probe_array]; + + let mut prob_indices = Vec::new(); + let mut build_indices = Vec::new(); + + // batch_size=3, should get 2 matches from first '1' and 1 match from second '1' + let result_offset = array_map.get_matched_indices_with_limit_offset( + &prob_side_keys, + 3, + (0, None), + &mut prob_indices, + &mut build_indices, + )?; + + assert_eq!(prob_indices, vec![1, 1, 3]); + assert_eq!(build_indices, vec![0, 1, 0]); + assert_eq!(result_offset, Some((3, Some(2)))); + Ok(()) + } + + #[test] + fn test_array_map_i64_with_negative_and_positive_numbers() -> Result<()> { + // Build array with a mix of negative and positive i64 values, no duplicates + let build_array: ArrayRef = Arc::new(Int64Array::from(vec![-5, 0, 5, -2, 3, 10])); + let min_val = -5_i128; + let max_val = 10_i128; + + let array_map = ArrayMap::try_new(&build_array, min_val as u64, max_val as u64)?; + + // Probe array + let probe_array: ArrayRef = Arc::new(Int64Array::from(vec![0, -5, 10, -1])); + let prob_side_keys = [Arc::clone(&probe_array)]; + + let mut prob_indices = Vec::new(); + let mut build_indices = Vec::new(); + + // Call once to get all matches + let result_offset = array_map.get_matched_indices_with_limit_offset( + &prob_side_keys, + 10, // A batch size larger than number of probes + (0, None), + &mut prob_indices, + &mut build_indices, + )?; + + // Expected matches, in probe-side order: + // Probe 0 (value 0) -> Build 1 (value 0) + // Probe 1 (value -5) -> Build 0 (value -5) + // Probe 2 (value 10) -> Build 5 (value 10) + let expected_prob_indices = vec![0, 1, 2]; + let expected_build_indices = vec![1, 0, 5]; + + assert_eq!(prob_indices, expected_prob_indices); + assert_eq!(build_indices, expected_build_indices); + assert!(result_offset.is_none()); + + Ok(()) + } +} diff --git a/datafusion/physical-plan/src/joins/chain.rs b/datafusion/physical-plan/src/joins/chain.rs new file mode 100644 index 0000000000000..846b7505d6478 --- /dev/null +++ b/datafusion/physical-plan/src/joins/chain.rs @@ -0,0 +1,69 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::ops::Sub; + +use arrow::datatypes::ArrowNativeType; + +use crate::joins::MapOffset; + +/// Traverses the chain of matching indices, collecting results up to the remaining limit. +/// Returns `Some(offset)` if the limit was reached and there are more results to process, +/// or `None` if the chain was fully traversed. +#[inline(always)] +pub(crate) fn traverse_chain( + next_chain: &[T], + prob_idx: usize, + start_chain_idx: T, + remaining: &mut usize, + input_indices: &mut Vec, + match_indices: &mut Vec, + is_last_input: bool, +) -> Option +where + T: Copy + TryFrom + PartialOrd + Into + Sub, + >::Error: Debug, + T: ArrowNativeType, +{ + let zero = T::usize_as(0); + let one = T::usize_as(1); + let mut match_row_idx = start_chain_idx - one; + + loop { + match_indices.push(match_row_idx.into()); + input_indices.push(prob_idx as u32); + *remaining -= 1; + + let next = next_chain[match_row_idx.into() as usize]; + + if *remaining == 0 { + // Limit reached - return offset for next call + return if is_last_input && next == zero { + // Finished processing the last input row + None + } else { + Some((prob_idx, Some(next.into()))) + }; + } + if next == zero { + // End of chain + return None; + } + match_row_idx = next - one; + } +} diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs b/datafusion/physical-plan/src/joins/hash_join/exec.rs index 91fc1ee4436ee..722c1a46eea7d 100644 --- a/datafusion/physical-plan/src/joins/hash_join/exec.rs +++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs @@ -27,6 +27,8 @@ use crate::filter_pushdown::{ ChildPushdownResult, FilterDescription, FilterPushdownPhase, FilterPushdownPropagation, }; +use crate::joins::Map; +use crate::joins::array_map::ArrayMap; use crate::joins::hash_join::inlist_builder::build_struct_inlist_values; use crate::joins::hash_join::shared_bounds::{ ColumnBounds, PartitionBounds, PushdownStrategy, SharedBuildAccumulator, @@ -40,6 +42,7 @@ use crate::joins::utils::{ swap_join_projection, update_hash, }; use crate::joins::{JoinOn, JoinOnRef, PartitionMode, SharedBitmapBuilder}; +use crate::metrics::{Count, MetricBuilder}; use crate::projection::{ EmbeddedProjection, JoinData, ProjectionExec, try_embed_projection, try_pushdown_through_join, @@ -67,8 +70,8 @@ use arrow_schema::DataType; use datafusion_common::config::ConfigOptions; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ - JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, plan_err, - project_schema, + JoinSide, JoinType, NullEquality, Result, assert_or_internal_err, internal_err, + plan_err, project_schema, }; use datafusion_execution::TaskContext; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; @@ -92,11 +95,89 @@ use super::partitioned_hash_eval::SeededRandomState; pub(crate) const HASH_JOIN_SEED: SeededRandomState = SeededRandomState::with_seeds('J' as u64, 'O' as u64, 'I' as u64, 'N' as u64); +const ARRAY_MAP_CREATED_COUNT_METRIC_NAME: &str = "array_map_created_count"; + +#[expect(clippy::too_many_arguments)] +fn try_create_array_map( + bounds: &Option, + schema: &SchemaRef, + batches: &[RecordBatch], + on_left: &[PhysicalExprRef], + reservation: &mut MemoryReservation, + perfect_hash_join_small_build_threshold: usize, + perfect_hash_join_min_key_density: f64, + null_equality: NullEquality, +) -> Result)>> { + if on_left.len() != 1 { + return Ok(None); + } + + if null_equality == NullEquality::NullEqualsNull { + for batch in batches.iter() { + let arrays = evaluate_expressions_to_arrays(on_left, batch)?; + if arrays[0].null_count() > 0 { + return Ok(None); + } + } + } + + let (min_val, max_val) = if let Some(bounds) = bounds { + let (min_val, max_val) = if let Some(cb) = bounds.get_column_bounds(0) { + (cb.min.clone(), cb.max.clone()) + } else { + return Ok(None); + }; + + if min_val.is_null() || max_val.is_null() { + return Ok(None); + } + + if min_val > max_val { + return internal_err!("min_val>max_val"); + } + + if let Some((mi, ma)) = + ArrayMap::key_to_u64(&min_val).zip(ArrayMap::key_to_u64(&max_val)) + { + (mi, ma) + } else { + return Ok(None); + } + } else { + return Ok(None); + }; + + let range = ArrayMap::calculate_range(min_val, max_val); + let num_row: usize = batches.iter().map(|x| x.num_rows()).sum(); + let dense_ratio = (num_row as f64) / ((range + 1) as f64); + + // TODO: support create ArrayMap + if num_row >= u32::MAX as usize { + return Ok(None); + } + + if range >= perfect_hash_join_small_build_threshold as u64 + && dense_ratio <= perfect_hash_join_min_key_density + { + return Ok(None); + } + + let mem_size = ArrayMap::estimate_memory_size(min_val, max_val, num_row); + reservation.try_grow(mem_size)?; + + let batch = concat_batches(schema, batches)?; + let left_values = evaluate_expressions_to_arrays(on_left, &batch)?; + + let array_map = ArrayMap::try_new(&left_values[0], min_val, max_val)?; + + Ok(Some((array_map, batch, left_values))) +} + /// HashTable and input data for the left (build side) of a join pub(super) struct JoinLeftData { /// The hash table with indices into `batch` /// Arc is used to allow sharing with SharedBuildAccumulator for hash map pushdown - pub(super) hash_map: Arc, + pub(super) map: Arc, /// The input rows for the build side batch: RecordBatch, /// The build side on expressions values @@ -121,9 +202,9 @@ pub(super) struct JoinLeftData { } impl JoinLeftData { - /// return a reference to the hash map - pub(super) fn hash_map(&self) -> &dyn JoinHashMapType { - &*self.hash_map + /// return a reference to the map + pub(super) fn map(&self) -> &Map { + &self.map } /// returns a reference to the build side batch @@ -168,6 +249,34 @@ impl JoinLeftData { /// ` != `) are known as "filter expressions" and are evaluated /// after the equijoin predicates. /// +/// # ArrayMap Optimization +/// +/// For joins with a single integer-based join key, `HashJoinExec` may use an [`ArrayMap`] +/// (also known as a "perfect hash join") instead of a general-purpose hash map. +/// This optimization is used when: +/// 1. There is exactly one join key. +/// 2. The join key can be any integer type convertible to u64 (excluding i128 and u128). +/// 3. The range of keys is small enough (controlled by `perfect_hash_join_small_build_threshold`) +/// OR the keys are sufficiently dense (controlled by `perfect_hash_join_min_key_density`). +/// 4. build_side.num_rows() < u32::MAX +/// +/// See [`try_create_array_map`] for more details. +/// +/// Note that when using [`PartitionMode::Partitioned`], the build side is split into multiple +/// partitions. This can cause a dense build side to become sparse within each partition, +/// potentially disabling this optimization. +/// +/// For example, consider: +/// ```sql +/// SELECT t1.value, t2.value +/// FROM range(10000) AS t1 +/// JOIN range(10000) AS t2 +/// ON t1.value = t2.value; +/// ``` +/// With 24 partitions, each partition will only receive a subset of the 10,000 rows. +/// The first partition might contain values like `3, 10, 18, 39, 43`, which are sparse +/// relative to the original range, even though the overall data set is dense. +/// /// # "Build Side" vs "Probe Side" /// /// HashJoin takes two inputs, which are referred to as the "build" and the @@ -201,9 +310,9 @@ impl JoinLeftData { /// Resulting hash table stores hashed join-key fields for each row as a key, and /// indices of corresponding rows in concatenated batch. /// -/// Hash join uses LIFO data structure as a hash table, and in order to retain -/// original build-side input order while obtaining data during probe phase, hash -/// table is updated by iterating batch sequence in reverse order -- it allows to +/// When using the standard `JoinHashMap`, hash join uses LIFO data structure as a hash table, +/// and in order to retain original build-side input order while obtaining data during probe phase, +/// hash table is updated by iterating batch sequence in reverse order -- it allows to /// keep rows with smaller indices "on the top" of hash table, and still maintain /// correct indexing for concatenated build-side data batch. /// @@ -949,6 +1058,10 @@ impl ExecutionPlan for HashJoinExec { .unwrap_or(false); let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); + + let array_map_created_count = MetricBuilder::new(&self.metrics) + .counter(ARRAY_MAP_CREATED_COUNT_METRIC_NAME, partition); + let left_fut = match self.mode { PartitionMode::CollectLeft => self.left_fut.try_once(|| { let left_stream = self.left.execute(0, Arc::clone(&context))?; @@ -965,16 +1078,9 @@ impl ExecutionPlan for HashJoinExec { need_produce_result_in_final(self.join_type), self.right().output_partitioning().partition_count(), enable_dynamic_filter_pushdown, - context - .session_config() - .options() - .optimizer - .hash_join_inlist_pushdown_max_size, - context - .session_config() - .options() - .optimizer - .hash_join_inlist_pushdown_max_distinct_values, + Arc::clone(context.session_config().options()), + self.null_equality, + array_map_created_count, )) })?, PartitionMode::Partitioned => { @@ -993,16 +1099,9 @@ impl ExecutionPlan for HashJoinExec { need_produce_result_in_final(self.join_type), 1, enable_dynamic_filter_pushdown, - context - .session_config() - .options() - .optimizer - .hash_join_inlist_pushdown_max_size, - context - .session_config() - .options() - .optimizer - .hash_join_inlist_pushdown_max_distinct_values, + Arc::clone(context.session_config().options()), + self.null_equality, + array_map_created_count, )) } PartitionMode::Auto => { @@ -1364,6 +1463,19 @@ impl BuildSideState { } } +fn should_collect_min_max_for_perfect_hash( + on_left: &[PhysicalExprRef], + schema: &SchemaRef, +) -> Result { + if on_left.len() != 1 { + return Ok(false); + } + + let expr = &on_left[0]; + let data_type = expr.data_type(schema)?; + Ok(ArrayMap::is_supported_type(&data_type)) +} + /// Collects all batches from the left (build) side stream and creates a hash map for joining. /// /// This function is responsible for: @@ -1402,20 +1514,21 @@ async fn collect_left_input( with_visited_indices_bitmap: bool, probe_threads_count: usize, should_compute_dynamic_filters: bool, - max_inlist_size: usize, - max_inlist_distinct_values: usize, + config: Arc, + null_equality: NullEquality, + array_map_created_count: Count, ) -> Result { let schema = left_stream.schema(); - // This operation performs 2 steps at once: - // 1. creates a [JoinHashMap] of all batches from the stream - // 2. stores the batches in a vector. + let should_collect_min_max_for_phj = + should_collect_min_max_for_perfect_hash(&on_left, &schema)?; + let initial = BuildSideState::try_new( metrics, reservation, on_left.clone(), &schema, - should_compute_dynamic_filters, + should_compute_dynamic_filters || should_collect_min_max_for_phj, )?; let state = left_stream @@ -1452,50 +1565,85 @@ async fn collect_left_input( bounds_accumulators, } = state; - // Estimation of memory size, required for hashtable, prior to allocation. - // Final result can be verified using `RawTable.allocation_info()` - let fixed_size_u32 = size_of::(); - let fixed_size_u64 = size_of::(); - - // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the - // `u64` indice variant - // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown - let mut hashmap: Box = if num_rows > u32::MAX as usize { - let estimated_hashtable_size = - estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; - reservation.try_grow(estimated_hashtable_size)?; - metrics.build_mem_used.add(estimated_hashtable_size); - Box::new(JoinHashMapU64::with_capacity(num_rows)) - } else { - let estimated_hashtable_size = - estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?; - reservation.try_grow(estimated_hashtable_size)?; - metrics.build_mem_used.add(estimated_hashtable_size); - Box::new(JoinHashMapU32::with_capacity(num_rows)) + // Compute bounds + let mut bounds = match bounds_accumulators { + Some(accumulators) if num_rows > 0 => { + let bounds = accumulators + .into_iter() + .map(CollectLeftAccumulator::evaluate) + .collect::>>()?; + Some(PartitionBounds::new(bounds)) + } + _ => None, }; - let mut hashes_buffer = Vec::new(); - let mut offset = 0; - - // Updating hashmap starting from the last batch - let batches_iter = batches.iter().rev(); - for batch in batches_iter.clone() { - hashes_buffer.clear(); - hashes_buffer.resize(batch.num_rows(), 0); - update_hash( + let (join_hash_map, batch, left_values) = + if let Some((array_map, batch, left_value)) = try_create_array_map( + &bounds, + &schema, + &batches, &on_left, - batch, - &mut *hashmap, - offset, - &random_state, - &mut hashes_buffer, - 0, - true, - )?; - offset += batch.num_rows(); - } - // Merge all batches into a single batch, so we can directly index into the arrays - let batch = concat_batches(&schema, batches_iter)?; + &mut reservation, + config.execution.perfect_hash_join_small_build_threshold, + config.execution.perfect_hash_join_min_key_density, + null_equality, + )? { + array_map_created_count.add(1); + metrics.build_mem_used.add(array_map.size()); + + (Map::ArrayMap(array_map), batch, left_value) + } else { + // Estimation of memory size, required for hashtable, prior to allocation. + // Final result can be verified using `RawTable.allocation_info()` + let fixed_size_u32 = size_of::(); + let fixed_size_u64 = size_of::(); + + // Use `u32` indices for the JoinHashMap when num_rows ≤ u32::MAX, otherwise use the + // `u64` indice variant + // Arc is used instead of Box to allow sharing with SharedBuildAccumulator for hash map pushdown + let mut hashmap: Box = if num_rows > u32::MAX as usize { + let estimated_hashtable_size = + estimate_memory_size::<(u64, u64)>(num_rows, fixed_size_u64)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Box::new(JoinHashMapU64::with_capacity(num_rows)) + } else { + let estimated_hashtable_size = + estimate_memory_size::<(u32, u64)>(num_rows, fixed_size_u32)?; + reservation.try_grow(estimated_hashtable_size)?; + metrics.build_mem_used.add(estimated_hashtable_size); + Box::new(JoinHashMapU32::with_capacity(num_rows)) + }; + + let mut hashes_buffer = Vec::new(); + let mut offset = 0; + + let batches_iter = batches.iter().rev(); + + // Updating hashmap starting from the last batch + for batch in batches_iter.clone() { + hashes_buffer.clear(); + hashes_buffer.resize(batch.num_rows(), 0); + update_hash( + &on_left, + batch, + &mut *hashmap, + offset, + &random_state, + &mut hashes_buffer, + 0, + true, + )?; + offset += batch.num_rows(); + } + + // Merge all batches into a single batch, so we can directly index into the arrays + let batch = concat_batches(&schema, batches_iter.clone())?; + + let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; + + (Map::HashMap(hashmap), batch, left_values) + }; // Reserve additional memory for visited indices bitmap and create shared builder let visited_indices_bitmap = if with_visited_indices_bitmap { @@ -1510,22 +1658,7 @@ async fn collect_left_input( BooleanBufferBuilder::new(0) }; - let left_values = evaluate_expressions_to_arrays(&on_left, &batch)?; - - // Compute bounds for dynamic filter if enabled - let bounds = match bounds_accumulators { - Some(accumulators) if num_rows > 0 => { - let bounds = accumulators - .into_iter() - .map(CollectLeftAccumulator::evaluate) - .collect::>>()?; - Some(PartitionBounds::new(bounds)) - } - _ => None, - }; - - // Convert Box to Arc for sharing with SharedBuildAccumulator - let hash_map: Arc = hashmap.into(); + let map = Arc::new(join_hash_map); let membership = if num_rows == 0 { PushdownStrategy::Empty @@ -1539,19 +1672,26 @@ async fn collect_left_input( .sum::(); if left_values.is_empty() || left_values[0].is_empty() - || estimated_size > max_inlist_size - || hash_map.len() > max_inlist_distinct_values + || estimated_size > config.optimizer.hash_join_inlist_pushdown_max_size + || map.num_of_distinct_key() + > config + .optimizer + .hash_join_inlist_pushdown_max_distinct_values { - PushdownStrategy::HashTable(Arc::clone(&hash_map)) + PushdownStrategy::HashTable(Arc::clone(&map)) } else if let Some(in_list_values) = build_struct_inlist_values(&left_values)? { PushdownStrategy::InList(in_list_values) } else { - PushdownStrategy::HashTable(Arc::clone(&hash_map)) + PushdownStrategy::HashTable(Arc::clone(&map)) } }; + if should_collect_min_max_for_phj && !should_compute_dynamic_filters { + bounds = None; + } + let data = JoinLeftData { - hash_map, + map, batch, values: left_values, visited_indices_bitmap: Mutex::new(visited_indices_bitmap), @@ -1567,6 +1707,27 @@ async fn collect_left_input( #[cfg(test)] mod tests { use super::*; + + fn assert_phj_used(metrics: &MetricsSet, use_phj: bool) { + if use_phj { + assert!( + metrics + .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME) + .expect("should have array_map_created_count metrics") + .as_usize() + >= 1 + ); + } else { + assert_eq!( + metrics + .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME) + .map(|v| v.as_usize()) + .unwrap_or(0), + 0 + ) + } + } + use crate::coalesce_partitions::CoalescePartitionsExec; use crate::joins::hash_join::stream::lookup_join_hashmap; use crate::test::{TestMemoryExec, assert_join_metrics}; @@ -1601,10 +1762,37 @@ mod tests { #[template] #[rstest] - fn batch_sizes(#[values(8192, 10, 5, 2, 1)] batch_size: usize) {} + fn hash_join_exec_configs( + #[values(8192, 10, 5, 2, 1)] batch_size: usize, + #[values(true, false)] use_perfect_hash_join_as_possible: bool, + ) { + } - fn prepare_task_ctx(batch_size: usize) -> Arc { - let session_config = SessionConfig::default().with_batch_size(batch_size); + fn prepare_task_ctx( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Arc { + let mut session_config = SessionConfig::default().with_batch_size(batch_size); + + if use_perfect_hash_join_as_possible { + session_config + .options_mut() + .execution + .perfect_hash_join_small_build_threshold = 819200; + session_config + .options_mut() + .execution + .perfect_hash_join_min_key_density = 0.0; + } else { + session_config + .options_mut() + .execution + .perfect_hash_join_small_build_threshold = 0; + session_config + .options_mut() + .execution + .perfect_hash_join_min_key_density = 1.0 / 0.0; + } Arc::new(TaskContext::default().with_session_config(session_config)) } @@ -1772,10 +1960,13 @@ mod tests { Ok((columns, batches, metrics)) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_inner_one(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_inner_one( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 5]), // this has a repetition @@ -1818,14 +2009,18 @@ mod tests { } assert_join_metrics!(metrics, 3); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn partitioned_join_inner_one(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn partitioned_join_inner_one( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 5]), // this has a repetition @@ -1866,6 +2061,7 @@ mod tests { } assert_join_metrics!(metrics, 3); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); Ok(()) } @@ -1967,10 +2163,13 @@ mod tests { Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_inner_two(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_inner_two( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 2]), ("b2", &vec![1, 2, 2]), @@ -2044,10 +2243,13 @@ mod tests { } /// Test where the left has 2 parts, the right with 1 part => 1 part - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_inner_one_two_parts_left(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_inner_one_two_parts_left( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let batch1 = build_table_i32( ("a1", &vec![1, 2]), ("b2", &vec![1, 2]), @@ -2189,10 +2391,13 @@ mod tests { } /// Test where the left has 1 part, the right has 2 parts => 2 parts - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_inner_one_two_parts_right(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_inner_one_two_parts_right( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 5]), // this has a repetition @@ -2293,6 +2498,9 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + Ok(()) } @@ -2306,10 +2514,13 @@ mod tests { TestMemoryExec::try_new_exec(&[vec![batch.clone(), batch]], schema, None).unwrap() } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_left_multi_batch(batch_size: usize) { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_left_multi_batch( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -2326,9 +2537,9 @@ mod tests { )]; let join = join( - left, - right, - on, + Arc::clone(&left), + Arc::clone(&right), + on.clone(), &JoinType::Left, NullEquality::NullEqualsNothing, ) @@ -2337,8 +2548,15 @@ mod tests { let columns = columns(&join.schema()); assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); - let stream = join.execute(0, task_ctx).unwrap(); - let batches = common::collect(stream).await.unwrap(); + let (_, batches, metrics) = join_collect( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + &JoinType::Left, + NullEquality::NullEqualsNothing, + task_ctx, + ) + .await?; allow_duplicates! { assert_snapshot!(batches_to_sort_string(&batches), @r" @@ -2353,12 +2571,18 @@ mod tests { +----+----+----+----+----+----+ "); } + + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + return Ok(()); } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_full_multi_batch(batch_size: usize) { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_full_multi_batch( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -2389,6 +2613,7 @@ mod tests { let stream = join.execute(0, task_ctx).unwrap(); let batches = common::collect(stream).await.unwrap(); + let metrics = join.metrics().unwrap(); allow_duplicates! { assert_snapshot!(batches_to_sort_string(&batches), @r" @@ -2405,12 +2630,17 @@ mod tests { +----+----+----+----+----+----+ "); } + + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_left_empty_right(batch_size: usize) { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_left_empty_right( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), @@ -2437,6 +2667,7 @@ mod tests { let stream = join.execute(0, task_ctx).unwrap(); let batches = common::collect(stream).await.unwrap(); + let metrics = join.metrics().unwrap(); allow_duplicates! { assert_snapshot!(batches_to_sort_string(&batches), @r" @@ -2449,12 +2680,17 @@ mod tests { +----+----+----+----+----+----+ "); } + + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_full_empty_right(batch_size: usize) { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_full_empty_right( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), @@ -2481,6 +2717,7 @@ mod tests { let stream = join.execute(0, task_ctx).unwrap(); let batches = common::collect(stream).await.unwrap(); + let metrics = join.metrics().unwrap(); allow_duplicates! { assert_snapshot!(batches_to_sort_string(&batches), @r" @@ -2493,12 +2730,17 @@ mod tests { +----+----+----+----+----+----+ "); } + + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_left_one(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_left_one( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -2539,14 +2781,18 @@ mod tests { } assert_join_metrics!(metrics, 3); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn partitioned_join_left_one(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn partitioned_join_left_one( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -2587,6 +2833,7 @@ mod tests { } assert_join_metrics!(metrics, 3); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); Ok(()) } @@ -2611,10 +2858,13 @@ mod tests { ) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_left_semi(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_left_semi( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); // left_table left semi join right_table on left_table.b1 = right_table.b2 @@ -2650,13 +2900,19 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_left_semi_with_filter(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_left_semi_with_filter( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); @@ -2712,6 +2968,9 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + // left_table left semi join right_table on left_table.b1 = right_table.b2 and right_table.a2 > 10 let filter_expression = Arc::new(BinaryExpr::new( Arc::new(Column::new("x", 0)), @@ -2749,13 +3008,19 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_right_semi(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_right_semi( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); @@ -2792,13 +3057,19 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_right_semi_with_filter(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_right_semi_with_filter( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); @@ -2855,6 +3126,9 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + // left_table right semi join right_table on left_table.b1 = right_table.b2 on left_table.a1!=9 let filter_expression = Arc::new(BinaryExpr::new( Arc::new(Column::new("x", 0)), @@ -2891,13 +3165,19 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_left_anti(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_left_anti( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); // left_table left anti join right_table on left_table.b1 = right_table.b2 @@ -2932,13 +3212,20 @@ mod tests { +----+----+----+ "); } + + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_left_anti_with_filter(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_left_anti_with_filter( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2!=8 @@ -2995,6 +3282,9 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + // left_table left anti join right_table on left_table.b1 = right_table.b2 and right_table.a2 != 13 let filter_expression = Arc::new(BinaryExpr::new( Arc::new(Column::new("x", 0)), @@ -3038,13 +3328,19 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_right_anti(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_right_anti( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); let on = vec![( @@ -3078,13 +3374,20 @@ mod tests { +----+----+-----+ "); } + + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_right_anti_with_filter(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_right_anti_with_filter( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_semi_anti_left_table(); let right = build_semi_anti_right_table(); // left_table right anti join right_table on left_table.b1 = right_table.b2 and left_table.a1!=13 @@ -3142,6 +3445,9 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + // left_table right anti join right_table on left_table.b1 = right_table.b2 and right_table.b2!=8 let column_indices = vec![ColumnIndex { index: 1, @@ -3188,13 +3494,19 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_right_one(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_right_one( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), @@ -3235,14 +3547,18 @@ mod tests { } assert_join_metrics!(metrics, 3); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn partitioned_join_right_one(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn partitioned_join_right_one( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), @@ -3283,14 +3599,18 @@ mod tests { } assert_join_metrics!(metrics, 3); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_full_one(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_full_one( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -3333,13 +3653,19 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_left_mark(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_left_mark( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -3380,14 +3706,18 @@ mod tests { } assert_join_metrics!(metrics, 3); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn partitioned_join_left_mark(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn partitioned_join_left_mark( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -3428,14 +3758,18 @@ mod tests { } assert_join_metrics!(metrics, 3); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_right_mark(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_right_mark( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -3475,14 +3809,18 @@ mod tests { assert_batches_sorted_eq!(expected, &batches); assert_join_metrics!(metrics, 3); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn partitioned_join_right_mark(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn partitioned_join_right_mark( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a1", &vec![1, 2, 3]), ("b1", &vec![4, 5, 7]), // 7 does not exist on the right @@ -3523,6 +3861,7 @@ mod tests { assert_batches_sorted_eq!(expected, &batches); assert_join_metrics!(metrics, 4); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); Ok(()) } @@ -3729,10 +4068,13 @@ mod tests { ) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_inner_with_filter(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_inner_with_filter( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a", &vec![0, 1, 2, 2]), ("b", &vec![4, 5, 7, 8]), @@ -3775,13 +4117,19 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_left_with_filter(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_left_with_filter( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a", &vec![0, 1, 2, 2]), ("b", &vec![4, 5, 7, 8]), @@ -3827,13 +4175,19 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_right_with_filter(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_right_with_filter( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a", &vec![0, 1, 2, 2]), ("b", &vec![4, 5, 7, 8]), @@ -3878,13 +4232,19 @@ mod tests { "); } + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + Ok(()) } - #[apply(batch_sizes)] + #[apply(hash_join_exec_configs)] #[tokio::test] - async fn join_full_with_filter(batch_size: usize) -> Result<()> { - let task_ctx = prepare_task_ctx(batch_size); + async fn join_full_with_filter( + batch_size: usize, + use_perfect_hash_join_as_possible: bool, + ) -> Result<()> { + let task_ctx = prepare_task_ctx(batch_size, use_perfect_hash_join_as_possible); let left = build_table( ("a", &vec![0, 1, 2, 2]), ("b", &vec![4, 5, 7, 8]), @@ -3931,6 +4291,9 @@ mod tests { ]; assert_batches_sorted_eq!(expected, &batches); + let metrics = join.metrics().unwrap(); + assert_phj_used(&metrics, use_perfect_hash_join_as_possible); + // THIS MIGRATION HALTED DUE TO ISSUE #15312 //allow_duplicates! { // assert_snapshot!(batches_to_sort_string(&batches), @r#" @@ -4280,7 +4643,7 @@ mod tests { // validation of partial join results output for different batch_size setting for join_type in join_types { for batch_size in (1..21).rev() { - let task_ctx = prepare_task_ctx(batch_size); + let task_ctx = prepare_task_ctx(batch_size, true); let join = join( Arc::clone(&left), @@ -4719,4 +5082,187 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_perfect_hash_join_with_negative_numbers() -> Result<()> { + let task_ctx = prepare_task_ctx(8192, true); + let left = build_table( + ("a1", &vec![1, 2, 3]), + ("b1", &vec![-1, 0, 1]), + ("c1", &vec![4, 5, 6]), + ); + let right = build_table( + ("a2", &vec![10, 20, 30, 40]), + ("b1", &vec![1, -1, 0, 2]), + ("c2", &vec![70, 80, 90, 100]), + ); + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let (columns, batches, metrics) = join_collect( + Arc::clone(&left), + Arc::clone(&right), + on.clone(), + &JoinType::Inner, + NullEquality::NullEqualsNothing, + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "c1", "a2", "b1", "c2"]); + + assert_batches_sorted_eq!( + [ + "+----+----+----+----+----+----+", + "| a1 | b1 | c1 | a2 | b1 | c2 |", + "+----+----+----+----+----+----+", + "| 1 | -1 | 4 | 20 | -1 | 80 |", + "| 2 | 0 | 5 | 30 | 0 | 90 |", + "| 3 | 1 | 6 | 10 | 1 | 70 |", + "+----+----+----+----+----+----+", + ], + &batches + ); + + let array_map_created = metrics + .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME) + .unwrap(); + assert_eq!(array_map_created.as_usize(), 1); + + Ok(()) + } + + #[tokio::test] + async fn test_phj_null_equals_null_build_probe_all_have_nulls() -> Result<()> { + let task_ctx = prepare_task_ctx(8192, true); + let left_schema = Arc::new(Schema::new(vec![ + Field::new("a1", DataType::Int32, true), + Field::new("b1", DataType::Int32, true), + ])); + let left_batch = RecordBatch::try_new( + Arc::clone(&left_schema), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrayRef, + Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef, + ], + )?; + let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?; + + let right_schema = Arc::new(Schema::new(vec![ + Field::new("a2", DataType::Int32, true), + Field::new("b1", DataType::Int32, true), + ])); + let right_batch = RecordBatch::try_new( + Arc::clone(&right_schema), + vec![ + Arc::new(Int32Array::from(vec![Some(3), Some(4)])) as ArrayRef, + Arc::new(Int32Array::from(vec![Some(10), None])) as ArrayRef, + ], + )?; + let right = + TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?; + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let (columns, batches, metrics) = join_collect( + left, + right, + on, + &JoinType::Inner, + NullEquality::NullEqualsNull, + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]); + assert_batches_sorted_eq!( + [ + "+----+----+----+----+", + "| a1 | b1 | a2 | b1 |", + "+----+----+----+----+", + "| 1 | 10 | 3 | 10 |", + "| 2 | | 4 | |", + "+----+----+----+----+", + ], + &batches + ); + + let array_map_created = metrics + .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME) + .unwrap(); + assert_eq!(array_map_created.as_usize(), 0); + + Ok(()) + } + + #[tokio::test] + async fn test_phj_null_equals_null_probe_no_nulls() -> Result<()> { + let task_ctx = prepare_task_ctx(8192, true); + let left_schema = Arc::new(Schema::new(vec![ + Field::new("a1", DataType::Int32, true), + Field::new("b1", DataType::Int32, true), + ])); + let left_batch = RecordBatch::try_new( + Arc::clone(&left_schema), + vec![ + Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])) as ArrayRef, + Arc::new(Int32Array::from(vec![Some(10), Some(20), None])) as ArrayRef, + ], + )?; + let left = TestMemoryExec::try_new_exec(&[vec![left_batch]], left_schema, None)?; + + let right_schema = Arc::new(Schema::new(vec![ + Field::new("a2", DataType::Int32, true), + Field::new("b1", DataType::Int32, true), + ])); + let right_batch = RecordBatch::try_new( + Arc::clone(&right_schema), + vec![ + Arc::new(Int32Array::from(vec![Some(3), Some(4)])) as ArrayRef, + Arc::new(Int32Array::from(vec![Some(10), Some(30)])) as ArrayRef, + ], + )?; + let right = + TestMemoryExec::try_new_exec(&[vec![right_batch]], right_schema, None)?; + + let on = vec![( + Arc::new(Column::new_with_schema("b1", &left.schema())?) as _, + Arc::new(Column::new_with_schema("b1", &right.schema())?) as _, + )]; + + let (columns, batches, metrics) = join_collect( + left, + right, + on, + &JoinType::Inner, + NullEquality::NullEqualsNull, + task_ctx, + ) + .await?; + + assert_eq!(columns, vec!["a1", "b1", "a2", "b1"]); + assert_batches_sorted_eq!( + [ + "+----+----+----+----+", + "| a1 | b1 | a2 | b1 |", + "+----+----+----+----+", + "| 1 | 10 | 3 | 10 |", + "+----+----+----+----+", + ], + &batches + ); + + let array_map_created = metrics + .sum_by_name(ARRAY_MAP_CREATED_COUNT_METRIC_NAME) + .unwrap(); + assert_eq!(array_map_created.as_usize(), 0); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs index 4c437e813139d..4253b6480fa23 100644 --- a/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs +++ b/datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs @@ -26,13 +26,14 @@ use arrow::{ datatypes::{DataType, Schema}, util::bit_util, }; -use datafusion_common::{Result, internal_datafusion_err, internal_err}; +use datafusion_common::{Result, internal_datafusion_err}; use datafusion_expr::ColumnarValue; -use datafusion_physical_expr_common::physical_expr::{ - DynHash, PhysicalExpr, PhysicalExprRef, +use datafusion_physical_expr_common::{ + physical_expr::{DynHash, PhysicalExpr, PhysicalExprRef}, + utils::evaluate_expressions_to_arrays, }; -use crate::{hash_utils::create_hashes, joins::utils::JoinHashMapType}; +use crate::{hash_utils::create_hashes, joins::Map}; /// RandomState wrapper that preserves the seeds used to create it. /// @@ -212,15 +213,15 @@ impl PhysicalExpr for HashExpr { } } -/// Physical expression that checks if hash values exist in a hash table +/// Physical expression that checks membership in a [`Map`] (hash table or array map). /// -/// Takes a UInt64Array of hash values and checks membership in a hash table. -/// Returns a BooleanArray indicating which hashes exist. +/// Returns a [`BooleanArray`] indicating if join keys (from `hash_expr`) exist in the map. +// TODO: rename to MapLookupExpr pub struct HashTableLookupExpr { - /// Expression that computes hash values (should be a HashExpr) - hash_expr: PhysicalExprRef, - /// Hash table to check against - hash_map: Arc, + /// Expression that computes hash values and identifies join key columns + hash_expr: Arc, + /// Map to check against + map: Arc, /// Description for display description: String, } @@ -229,21 +230,16 @@ impl HashTableLookupExpr { /// Create a new HashTableLookupExpr /// /// # Arguments - /// * `hash_expr` - Expression that computes hash values + /// * `hash_expr` - Expression that computes hash values and identifies join key columns /// * `hash_map` - Hash table to check membership /// * `description` - Description for debugging - /// /// # Note /// This is public for internal testing purposes only and is not /// guaranteed to be stable across versions. - pub fn new( - hash_expr: PhysicalExprRef, - hash_map: Arc, - description: String, - ) -> Self { + pub fn new(hash_expr: Arc, map: Arc, description: String) -> Self { Self { hash_expr, - hash_map, + map, description, } } @@ -266,7 +262,7 @@ impl Hash for HashTableLookupExpr { // hash maps to have the same content in practice. // Theoretically this is a public API and users could create identical hash maps, // but that seems unlikely and not worth paying the cost of deep comparison all the time. - Arc::as_ptr(&self.hash_map).hash(state); + Arc::as_ptr(&self.map).hash(state); } } @@ -281,7 +277,7 @@ impl PartialEq for HashTableLookupExpr { // but that seems unlikely and not worth paying the cost of deep comparison all the time. self.hash_expr.as_ref() == other.hash_expr.as_ref() && self.description == other.description - && Arc::ptr_eq(&self.hash_map, &other.hash_map) + && Arc::ptr_eq(&self.map, &other.map) } } @@ -299,22 +295,22 @@ impl PhysicalExpr for HashTableLookupExpr { } fn children(&self) -> Vec<&Arc> { - vec![&self.hash_expr] + self.hash_expr.children() } fn with_new_children( self: Arc, children: Vec>, ) -> Result> { - if children.len() != 1 { - return internal_err!( - "HashTableLookupExpr expects exactly 1 child, got {}", - children.len() - ); - } - Ok(Arc::new(HashTableLookupExpr::new( - Arc::clone(&children[0]), - Arc::clone(&self.hash_map), + let hash_expr = Arc::clone(&self.hash_expr).with_new_children(children)?; + let hash_expr = Arc::downcast::(hash_expr).map_err(|_e| { + internal_datafusion_err!( + "HashTableLookupExpr::with_new_children expected a HashExpr" + ) + })?; + Ok(Arc::new(Self::new( + hash_expr, + Arc::clone(&self.map), self.description.clone(), ))) } @@ -333,24 +329,38 @@ impl PhysicalExpr for HashTableLookupExpr { ) -> Result { let num_rows = batch.num_rows(); - // Evaluate hash expression to get hash values - let hash_array = self.hash_expr.evaluate(batch)?.into_array(num_rows)?; - let hash_array = hash_array.as_any().downcast_ref::().ok_or( - internal_datafusion_err!( - "HashTableLookupExpr expects UInt64Array from hash expression" - ), - )?; - - // Check each hash against the hash table - let mut buf = MutableBuffer::from_len_zeroed(bit_util::ceil(num_rows, 8)); - for (idx, hash_value) in hash_array.values().iter().enumerate() { - // Use get_matched_indices to check - if it returns any indices, the hash exists - let (matched_indices, _) = self - .hash_map - .get_matched_indices(Box::new(std::iter::once((idx, hash_value))), None); - - if !matched_indices.is_empty() { - bit_util::set_bit(buf.as_slice_mut(), idx); + let mut buf: MutableBuffer = + MutableBuffer::from_len_zeroed(bit_util::ceil(num_rows, 8)); + + match self.map.as_ref() { + Map::HashMap(hash_map) => { + // Evaluate hash expression to get hash values + let hash_array = self.hash_expr.evaluate(batch)?.into_array(num_rows)?; + let hash_array = hash_array + .as_any() + .downcast_ref::() + .ok_or(internal_datafusion_err!( + "HashTableLookupExpr expects UInt64Array from hash expression" + ))?; + + // TODO: maybe we can avoid traversing `hash_map.next` + // Check each hash against the hash table + for (idx, hash_value) in hash_array.values().iter().enumerate() { + // Use get_matched_indices to check - if it returns any indices, the hash exists + let (matched_indices, _) = hash_map.get_matched_indices( + Box::new(std::iter::once((idx, hash_value))), + None, + ); + + if !matched_indices.is_empty() { + bit_util::set_bit(buf.as_slice_mut(), idx); + } + } + } + Map::ArrayMap(array_map) => { + let right = + evaluate_expressions_to_arrays(self.hash_expr.on_columns(), batch)?; + array_map.mark_existing_probes(&right, &mut buf)?; } } @@ -482,13 +492,13 @@ mod tests { #[test] fn test_hash_table_lookup_expr_eq_same() { let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); - let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new( + let hash_expr = Arc::new(HashExpr::new( vec![Arc::clone(&col_a)], SeededRandomState::with_seeds(1, 2, 3, 4), "inner_hash".to_string(), )); - let hash_map: Arc = - Arc::new(JoinHashMapU32::with_capacity(10)); + let hash_map = + Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(10)))); let expr1 = HashTableLookupExpr::new( Arc::clone(&hash_expr), @@ -510,20 +520,20 @@ mod tests { let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); let col_b: PhysicalExprRef = Arc::new(Column::new("b", 1)); - let hash_expr1: PhysicalExprRef = Arc::new(HashExpr::new( + let hash_expr1 = Arc::new(HashExpr::new( vec![Arc::clone(&col_a)], SeededRandomState::with_seeds(1, 2, 3, 4), "inner_hash".to_string(), )); - let hash_expr2: PhysicalExprRef = Arc::new(HashExpr::new( + let hash_expr2 = Arc::new(HashExpr::new( vec![Arc::clone(&col_b)], SeededRandomState::with_seeds(1, 2, 3, 4), "inner_hash".to_string(), )); - let hash_map: Arc = - Arc::new(JoinHashMapU32::with_capacity(10)); + let hash_map = + Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(10)))); let expr1 = HashTableLookupExpr::new( Arc::clone(&hash_expr1), @@ -543,13 +553,13 @@ mod tests { #[test] fn test_hash_table_lookup_expr_eq_different_description() { let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); - let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new( + let hash_expr = Arc::new(HashExpr::new( vec![Arc::clone(&col_a)], SeededRandomState::with_seeds(1, 2, 3, 4), "inner_hash".to_string(), )); - let hash_map: Arc = - Arc::new(JoinHashMapU32::with_capacity(10)); + let hash_map = + Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(10)))); let expr1 = HashTableLookupExpr::new( Arc::clone(&hash_expr), @@ -569,18 +579,17 @@ mod tests { #[test] fn test_hash_table_lookup_expr_eq_different_hash_map() { let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); - let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new( + let hash_expr = Arc::new(HashExpr::new( vec![Arc::clone(&col_a)], SeededRandomState::with_seeds(1, 2, 3, 4), "inner_hash".to_string(), )); // Two different Arc pointers (even with same content) should not be equal - let hash_map1: Arc = - Arc::new(JoinHashMapU32::with_capacity(10)); - let hash_map2: Arc = - Arc::new(JoinHashMapU32::with_capacity(10)); - + let hash_map1 = + Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(10)))); + let hash_map2 = + Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(10)))); let expr1 = HashTableLookupExpr::new( Arc::clone(&hash_expr), hash_map1, @@ -600,13 +609,13 @@ mod tests { #[test] fn test_hash_table_lookup_expr_hash_consistency() { let col_a: PhysicalExprRef = Arc::new(Column::new("a", 0)); - let hash_expr: PhysicalExprRef = Arc::new(HashExpr::new( + let hash_expr = Arc::new(HashExpr::new( vec![Arc::clone(&col_a)], SeededRandomState::with_seeds(1, 2, 3, 4), "inner_hash".to_string(), )); - let hash_map: Arc = - Arc::new(JoinHashMapU32::with_capacity(10)); + let hash_map = + Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(10)))); let expr1 = HashTableLookupExpr::new( Arc::clone(&hash_expr), diff --git a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs index 7d34ce9acbd57..06b62a35869a7 100644 --- a/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs +++ b/datafusion/physical-plan/src/joins/hash_join/shared_bounds.rs @@ -23,13 +23,13 @@ use std::sync::Arc; use crate::ExecutionPlan; use crate::ExecutionPlanProperties; +use crate::joins::Map; use crate::joins::PartitionMode; use crate::joins::hash_join::exec::HASH_JOIN_SEED; use crate::joins::hash_join::inlist_builder::build_struct_fields; use crate::joins::hash_join::partitioned_hash_eval::{ HashExpr, HashTableLookupExpr, SeededRandomState, }; -use crate::joins::utils::JoinHashMapType; use arrow::array::ArrayRef; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; @@ -49,9 +49,9 @@ use tokio::sync::Barrier; #[derive(Debug, Clone, PartialEq)] pub(crate) struct ColumnBounds { /// The minimum value observed for this column - min: ScalarValue, + pub(crate) min: ScalarValue, /// The maximum value observed for this column - max: ScalarValue, + pub(crate) max: ScalarValue, } impl ColumnBounds { @@ -133,7 +133,7 @@ fn create_membership_predicate( on_right.to_vec(), random_state.clone(), "hash_join".to_string(), - )) as Arc; + )); Ok(Some(Arc::new(HashTableLookupExpr::new( lookup_hash_expr, @@ -241,7 +241,7 @@ pub(crate) enum PushdownStrategy { /// Use InList for small build sides (< 128MB) InList(ArrayRef), /// Use hash table lookup for large build sides - HashTable(Arc), + HashTable(Arc), /// There was no data in this partition, do not build a dynamic filter for it Empty, } diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index e6735675125bd..a08ab2eedab3b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -23,6 +23,8 @@ use std::sync::Arc; use std::task::Poll; +use crate::joins::Map; +use crate::joins::MapOffset; use crate::joins::PartitionMode; use crate::joins::hash_join::exec::JoinLeftData; use crate::joins::hash_join::shared_bounds::{ @@ -34,7 +36,6 @@ use crate::joins::utils::{ use crate::{ RecordBatchStream, SendableRecordBatchStream, handle_state, hash_utils::create_hashes, - joins::join_hash_map::JoinHashMapOffset, joins::utils::{ BuildProbeJoinMetrics, ColumnIndex, JoinFilter, JoinHashMapType, StatefulStreamResult, adjust_indices_by_join_type, apply_join_filter_to_indices, @@ -154,13 +155,13 @@ pub(super) struct ProcessProbeBatchState { /// Probe-side on expressions values values: Vec, /// Starting offset for JoinHashMap lookups - offset: JoinHashMapOffset, + offset: MapOffset, /// Max joined probe-side index from current batch joined_probe_idx: Option, } impl ProcessProbeBatchState { - fn advance(&mut self, offset: JoinHashMapOffset, joined_probe_idx: Option) { + fn advance(&mut self, offset: MapOffset, joined_probe_idx: Option) { self.offset = offset; if joined_probe_idx.is_some() { self.joined_probe_idx = joined_probe_idx; @@ -287,10 +288,10 @@ pub(super) fn lookup_join_hashmap( null_equality: NullEquality, hashes_buffer: &[u64], limit: usize, - offset: JoinHashMapOffset, + offset: MapOffset, probe_indices_buffer: &mut Vec, build_indices_buffer: &mut Vec, -) -> Result<(UInt64Array, UInt32Array, Option)> { +) -> Result<(UInt64Array, UInt32Array, Option)> { let next_offset = build_hashmap.get_matched_indices_with_limit_offset( hashes_buffer, limit, @@ -552,9 +553,15 @@ impl HashJoinStream { // Precalculate hash values for fetched batch let keys_values = evaluate_expressions_to_arrays(&self.on_right, &batch)?; - self.hashes_buffer.clear(); - self.hashes_buffer.resize(batch.num_rows(), 0); - create_hashes(&keys_values, &self.random_state, &mut self.hashes_buffer)?; + if let Map::HashMap(_) = self.build_side.try_as_ready()?.left_data.map() { + self.hashes_buffer.clear(); + self.hashes_buffer.resize(batch.num_rows(), 0); + create_hashes( + &keys_values, + &self.random_state, + &mut self.hashes_buffer, + )?; + } self.join_metrics.input_batches.add(1); self.join_metrics.input_rows.add(batch.num_rows()); @@ -589,7 +596,9 @@ impl HashJoinStream { let timer = self.join_metrics.join_time.timer(); // if the left side is empty, we can skip the (potentially expensive) join operation - if build_side.left_data.hash_map.is_empty() && self.filter.is_none() { + let is_empty = build_side.left_data.map().is_empty(); + + if is_empty && self.filter.is_none() { let result = build_batch_empty_build_side( &self.schema, build_side.left_data.batch(), @@ -605,17 +614,34 @@ impl HashJoinStream { } // get the matched by join keys indices - let (left_indices, right_indices, next_offset) = lookup_join_hashmap( - build_side.left_data.hash_map(), - build_side.left_data.values(), - &state.values, - self.null_equality, - &self.hashes_buffer, - self.batch_size, - state.offset, - &mut self.probe_indices_buffer, - &mut self.build_indices_buffer, - )?; + let (left_indices, right_indices, next_offset) = match build_side.left_data.map() + { + Map::HashMap(map) => lookup_join_hashmap( + map.as_ref(), + build_side.left_data.values(), + &state.values, + self.null_equality, + &self.hashes_buffer, + self.batch_size, + state.offset, + &mut self.probe_indices_buffer, + &mut self.build_indices_buffer, + )?, + Map::ArrayMap(array_map) => { + let next_offset = array_map.get_matched_indices_with_limit_offset( + &state.values, + self.batch_size, + state.offset, + &mut self.probe_indices_buffer, + &mut self.build_indices_buffer, + )?; + ( + UInt64Array::from(self.build_indices_buffer.clone()), + UInt32Array::from(self.probe_indices_buffer.clone()), + next_offset, + ) + } + }; let distinct_right_indices_count = count_distinct_sorted_indices(&right_indices); diff --git a/datafusion/physical-plan/src/joins/join_hash_map.rs b/datafusion/physical-plan/src/joins/join_hash_map.rs index b0ed6dcc7c255..1729f2ba18165 100644 --- a/datafusion/physical-plan/src/joins/join_hash_map.rs +++ b/datafusion/physical-plan/src/joins/join_hash_map.rs @@ -119,10 +119,10 @@ pub trait JoinHashMapType: Send + Sync { &self, hash_values: &[u64], limit: usize, - offset: JoinHashMapOffset, + offset: MapOffset, input_indices: &mut Vec, match_indices: &mut Vec, - ) -> Option; + ) -> Option; /// Returns `true` if the join hash map contains no entries. fn is_empty(&self) -> bool; @@ -181,10 +181,10 @@ impl JoinHashMapType for JoinHashMapU32 { &self, hash_values: &[u64], limit: usize, - offset: JoinHashMapOffset, + offset: MapOffset, input_indices: &mut Vec, match_indices: &mut Vec, - ) -> Option { + ) -> Option { get_matched_indices_with_limit_offset::( &self.map, &self.next, @@ -255,10 +255,10 @@ impl JoinHashMapType for JoinHashMapU64 { &self, hash_values: &[u64], limit: usize, - offset: JoinHashMapOffset, + offset: MapOffset, input_indices: &mut Vec, match_indices: &mut Vec, - ) -> Option { + ) -> Option { get_matched_indices_with_limit_offset::( &self.map, &self.next, @@ -279,54 +279,8 @@ impl JoinHashMapType for JoinHashMapU64 { } } -// Type of offsets for obtaining indices from JoinHashMap. -pub(crate) type JoinHashMapOffset = (usize, Option); - -/// Traverses the chain of matching indices, collecting results up to the remaining limit. -/// Returns `Some(offset)` if the limit was reached and there are more results to process, -/// or `None` if the chain was fully traversed. -#[inline(always)] -fn traverse_chain( - next_chain: &[T], - input_idx: usize, - start_chain_idx: T, - remaining: &mut usize, - input_indices: &mut Vec, - match_indices: &mut Vec, - is_last_input: bool, -) -> Option -where - T: Copy + TryFrom + PartialOrd + Into + Sub, - >::Error: Debug, - T: ArrowNativeType, -{ - let zero = T::usize_as(0); - let one = T::usize_as(1); - let mut match_row_idx = start_chain_idx - one; - - loop { - match_indices.push(match_row_idx.into()); - input_indices.push(input_idx as u32); - *remaining -= 1; - - let next = next_chain[match_row_idx.into() as usize]; - - if *remaining == 0 { - // Limit reached - return offset for next call - return if is_last_input && next == zero { - // Finished processing the last input row - None - } else { - Some((input_idx, Some(next.into()))) - }; - } - if next == zero { - // End of chain - return None; - } - match_row_idx = next - one; - } -} +use crate::joins::MapOffset; +use crate::joins::chain::traverse_chain; pub fn update_from_iter<'a, T>( map: &mut HashTable<(u64, T)>, @@ -414,10 +368,10 @@ pub fn get_matched_indices_with_limit_offset( next_chain: &[T], hash_values: &[u64], limit: usize, - offset: JoinHashMapOffset, + offset: MapOffset, input_indices: &mut Vec, match_indices: &mut Vec, -) -> Option +) -> Option where T: Copy + TryFrom + PartialOrd + Into + Sub, >::Error: Debug, diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 3ff61ecf1dacc..848d0472fe885 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -27,6 +27,7 @@ use parking_lot::Mutex; pub use piecewise_merge_join::PiecewiseMergeJoinExec; pub use sort_merge_join::SortMergeJoinExec; pub use symmetric_hash_join::SymmetricHashJoinExec; +pub mod chain; mod cross_join; mod hash_join; mod nested_loop_join; @@ -36,6 +37,7 @@ mod stream_join_utils; mod symmetric_hash_join; pub mod utils; +mod array_map; mod join_filter; /// Hash map implementations for join operations. /// @@ -43,6 +45,31 @@ mod join_filter; /// and is not guaranteed to be stable across versions. pub mod join_hash_map; +use array_map::ArrayMap; +use utils::JoinHashMapType; + +pub enum Map { + HashMap(Box), + ArrayMap(ArrayMap), +} + +impl Map { + /// Returns the number of elements in the map. + pub fn num_of_distinct_key(&self) -> usize { + match self { + Map::HashMap(map) => map.len(), + Map::ArrayMap(array_map) => array_map.num_of_distinct_key(), + } + } + + /// Returns `true` if the map contains no elements. + pub fn is_empty(&self) -> bool { + self.num_of_distinct_key() == 0 + } +} + +pub(crate) type MapOffset = (usize, Option); + #[cfg(test)] pub mod test_utils; diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index 22cc82a22db5f..63b60c85394c6 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -22,9 +22,9 @@ use std::collections::{HashMap, VecDeque}; use std::mem::size_of; use std::sync::Arc; +use crate::joins::MapOffset; use crate::joins::join_hash_map::{ - JoinHashMapOffset, get_matched_indices, get_matched_indices_with_limit_offset, - update_from_iter, + get_matched_indices, get_matched_indices_with_limit_offset, update_from_iter, }; use crate::joins::utils::{JoinFilter, JoinHashMapType}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricBuilder}; @@ -77,10 +77,10 @@ impl JoinHashMapType for PruningJoinHashMap { &self, hash_values: &[u64], limit: usize, - offset: JoinHashMapOffset, + offset: MapOffset, input_indices: &mut Vec, match_indices: &mut Vec, - ) -> Option { + ) -> Option { // Flatten the deque let next: Vec = self.next.iter().copied().collect(); get_matched_indices_with_limit_offset::( diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index aa5458849330f..a974a0a62de5d 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -2334,13 +2334,23 @@ async fn roundtrip_async_func_exec() -> Result<()> { /// it's a performance optimization filter, not a correctness requirement. #[test] fn roundtrip_hash_table_lookup_expr_to_lit() -> Result<()> { + use datafusion::physical_plan::joins::Map; + use datafusion::physical_plan::joins::{HashExpr, SeededRandomState}; + // Create a simple schema and input plan let schema = Arc::new(Schema::new(vec![Field::new("col", DataType::Int64, false)])); let input = Arc::new(EmptyExec::new(schema.clone())); // Create a HashTableLookupExpr - it will be replaced with lit(true) during serialization - let hash_map = Arc::new(JoinHashMapU32::with_capacity(0)); - let hash_expr: Arc = Arc::new(Column::new("col", 0)); + let hash_map = Arc::new(Map::HashMap(Box::new(JoinHashMapU32::with_capacity(0)))); + + // Create HashExpr instead of just a Column + let hash_expr = Arc::new(HashExpr::new( + vec![Arc::new(Column::new("col", 0))], + SeededRandomState::with_seeds(0, 0, 0, 0), + "test_hash".to_string(), + )); + let lookup_expr: Arc = Arc::new(HashTableLookupExpr::new( hash_expr, hash_map, diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 18f72cb9f7798..3952b2a2f76c0 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -260,6 +260,8 @@ datafusion.execution.parquet.statistics_enabled page datafusion.execution.parquet.statistics_truncate_length 64 datafusion.execution.parquet.write_batch_size 1024 datafusion.execution.parquet.writer_version 1.0 +datafusion.execution.perfect_hash_join_min_key_density 0.99 +datafusion.execution.perfect_hash_join_small_build_threshold 1024 datafusion.execution.planning_concurrency 13 datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 @@ -395,6 +397,8 @@ datafusion.execution.parquet.statistics_enabled page (writing) Sets if statistic datafusion.execution.parquet.statistics_truncate_length 64 (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting datafusion.execution.parquet.write_batch_size 1024 (writing) Sets write_batch_size in bytes datafusion.execution.parquet.writer_version 1.0 (writing) Sets parquet writer version valid values are "1.0" and "2.0" +datafusion.execution.perfect_hash_join_min_key_density 0.99 The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. +datafusion.execution.perfect_hash_join_small_build_threshold 1024 A perfect hash join (see `HashJoinExec` for more details) will be considered if the range of keys (max - min) on the build side is < this threshold. This provides a fast path for joins with very small key ranges, bypassing the density check. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. datafusion.execution.planning_concurrency 13 Fan-out during initial physical planning. This is mostly use to plan `UNION` children in parallel. Defaults to the number of CPU cores on the system datafusion.execution.skip_partial_aggregation_probe_ratio_threshold 0.8 Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input datafusion.execution.skip_partial_aggregation_probe_rows_threshold 100000 Number of input rows partial aggregation partition should process, before aggregation ratio check and trying to switch to skipping aggregation mode diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index c9222afe8ceb5..7c893a65d2e8f 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -74,6 +74,8 @@ The following configuration settings are available: | datafusion.catalog.has_header | true | Default value for `format.has_header` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. | | datafusion.catalog.newlines_in_values | false | Specifies whether newlines in (quoted) CSV values are supported. This is the default value for `format.newlines_in_values` for `CREATE EXTERNAL TABLE` if not specified explicitly in the statement. Parsing newlines in quoted values may be affected by execution behaviour such as parallel file scanning. Setting this to `true` ensures that newlines in values are parsed successfully, which may reduce performance. | | datafusion.execution.batch_size | 8192 | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would result in too much metadata memory consumption | +| datafusion.execution.perfect_hash_join_small_build_threshold | 1024 | A perfect hash join (see `HashJoinExec` for more details) will be considered if the range of keys (max - min) on the build side is < this threshold. This provides a fast path for joins with very small key ranges, bypassing the density check. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. | +| datafusion.execution.perfect_hash_join_min_key_density | 0.99 | The minimum required density of join keys on the build side to consider a perfect hash join (see `HashJoinExec` for more details). Density is calculated as: `(number of rows) / (max_key - min_key + 1)`. A perfect hash join may be used if the actual key density > this value. Currently only supports cases where build_side.num_rows() < u32::MAX. Support for build_side.num_rows() >= u32::MAX will be added in the future. | | datafusion.execution.coalesce_batches | true | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting | | datafusion.execution.collect_statistics | true | Should DataFusion collect statistics when first creating a table. Has no effect after the table is created. Applies to the default `ListingTableProvider` in DataFusion. Defaults to true. | | datafusion.execution.target_partitions | 0 | Number of partitions for query execution. Increasing partitions can increase concurrency. Defaults to the number of CPU cores on the system |