Skip to content

Conversation

@UBarney
Copy link
Contributor

@UBarney UBarney commented Jan 2, 2026

Which issue does this PR close?

  • Closes #.

Rationale for this change

The previous implementation of HashTableLookupExpr::evaluate relied on per-row calls to get_matched_indices, which incurred unnecessary performance overhead:

  1. Memory Overhead: Each per-row call triggered small Vec allocations and potential resizes, leading to pressure on the memory allocator.
  2. Redundant Computation: get_matched_indices traverses the entire hash chain to find all matches, which is unnecessary when we only need to verify the existence of a key.

Performance Results (TPC-H)

The following TPC-H results were obtained with DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true:

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ baseline@9a9ff ┃  optimized ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │      679.51 ms │  728.06 ms │  1.07x slower │
│ QQuery 2     │      388.33 ms │  384.11 ms │     no change │
│ QQuery 3     │      864.38 ms │  856.27 ms │     no change │
│ QQuery 4     │      458.46 ms │  468.26 ms │     no change │
│ QQuery 5     │     1614.26 ms │ 1525.65 ms │ +1.06x faster │
│ QQuery 6     │      611.20 ms │  610.06 ms │     no change │
│ QQuery 7     │      950.39 ms │  940.13 ms │     no change │
│ QQuery 8     │     1214.86 ms │ 1218.21 ms │     no change │
│ QQuery 9     │     2657.61 ms │ 2482.09 ms │ +1.07x faster │
│ QQuery 10    │     1050.70 ms │ 1001.96 ms │     no change │
│ QQuery 11    │      383.92 ms │  347.27 ms │ +1.11x faster │
│ QQuery 12    │      963.14 ms │  920.78 ms │     no change │
│ QQuery 13    │      473.68 ms │  480.97 ms │     no change │
│ QQuery 14    │      363.36 ms │  345.27 ms │     no change │
│ QQuery 15    │      960.56 ms │  955.05 ms │     no change │
│ QQuery 16    │      281.95 ms │  267.34 ms │ +1.05x faster │
│ QQuery 17    │     5306.43 ms │ 4983.21 ms │ +1.06x faster │
│ QQuery 18    │     3415.11 ms │ 3016.52 ms │ +1.13x faster │
│ QQuery 19    │      761.67 ms │  759.49 ms │     no change │
│ QQuery 20    │      650.20 ms │  642.40 ms │     no change │
│ QQuery 21    │     3111.85 ms │ 2833.05 ms │ +1.10x faster │
│ QQuery 22    │      141.75 ms │  143.06 ms │     no change │
└──────────────┴────────────────┴────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary             ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (baseline@9a9ff)   │ 27303.30ms │
│ Total Time (optimized)        │ 25909.21ms │
│ Average Time (baseline@9a9ff) │  1241.06ms │
│ Average Time (optimized)      │  1177.69ms │
│ Queries Faster                │          7 │
│ Queries Slower                │          1 │
│ Queries with No Change        │         14 │
│ Queries with Failure          │          0 │
└───────────────────────────────┴────────────┘

Note that Q1 does not involve HashJoin.

Note on Configuration

Benchmarks were conducted with DATAFUSION_EXECUTION_PARQUET_PUSHDOWN_FILTERS=true because HashTableLookupExpr::evaluate is NOT invoked under default settings.

I manually added dbg!(&num_rows) at L335 in partitioned_hash_eval.rs and confirmed that the logic path is only triggered when this flag is enabled. Under default settings, HashTableLookupExpr::evaluate is not called; . I am uncertain if this current behavior is intentional.

What changes are included in this PR?

  • Added JoinHashMapType::contain_hashes: A new trait method that processes
    a batch of hashes and updates a bitmask for existing keys.
  • Refactored HashTableLookupExpr::evaluate: Switched from per-row lookups to
    the new batch API.

Are these changes tested?

Yes

Are there any user-facing changes?

NO

@github-actions github-actions bot added the physical-plan Changes to the physical-plan crate label Jan 2, 2026
@UBarney UBarney marked this pull request as ready for review January 2, 2026 08:23
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR optimizes HashTableLookupExpr::evaluate by replacing per-row hash table lookups with a batch processing API. The previous implementation made individual get_matched_indices calls for each row, causing unnecessary memory allocations and redundant computations. The new approach uses a single batch call that sets bits in a buffer for all matching hashes at once, resulting in notable performance improvements across multiple TPC-H queries (up to 1.13x faster on Q18).

Key Changes:

  • Introduced set_bits_if_exists trait method for batch hash lookups
  • Refactored HashTableLookupExpr::evaluate to use the new batch API
  • Added comprehensive test coverage for the new functionality

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated no comments.

File Description
datafusion/physical-plan/src/joins/join_hash_map.rs Adds set_bits_if_exists trait method to JoinHashMapType and implements it for JoinHashMapU32 and JoinHashMapU64 with corresponding helper function and tests
datafusion/physical-plan/src/joins/stream_join_utils.rs Implements set_bits_if_exists for PruningJoinHashMap to support the new batch lookup API
datafusion/physical-plan/src/joins/hash_join/partitioned_hash_eval.rs Refactors HashTableLookupExpr::evaluate to use the new batch API instead of per-row lookups

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@apache apache deleted a comment from alamb-ghbot Jan 2, 2026
@apache apache deleted a comment from alamb-ghbot Jan 2, 2026
@apache apache deleted a comment from alamb-ghbot Jan 2, 2026
@apache apache deleted a comment from alamb-ghbot Jan 2, 2026
@apache apache deleted a comment from alamb-ghbot Jan 2, 2026
@apache apache deleted a comment from alamb-ghbot Jan 2, 2026
}
}
self.hash_map
.set_bits_if_exists(hash_array.values(), buf.as_slice_mut());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could use with_hashes / reuse hashes buffer instead of allocating a new one each time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Reusing the hashes buffer now. Thanks!

@Dandandan
Copy link
Contributor

Very nice! I added 2 suggestions which could maybe improve it even further.

@adriangb
Copy link
Contributor

adriangb commented Jan 2, 2026

Under default settings, HashTableLookupExpr::evaluate is not called; . I am uncertain if this current behavior is intentional.

Yes that's per design. It's only used when the filter is pushed down into and evaluated row by row by the Parquet machinery. There's work to make that the default: #19477

@UBarney UBarney requested a review from Dandandan January 3, 2026 15:09
@UBarney
Copy link
Contributor Author

UBarney commented Jan 3, 2026

Very nice! I added 2 suggestions which could maybe improve it even further.

Thanks for reviewing! I've applied both suggestions. Could you please take another look? @Dandandan


// Optimization: if hash_expr is HashExpr, compute hashes directly into callback
// to avoid redundant allocations and copies.
if let Some(hash_expr) = self.hash_expr.as_any().downcast_ref::<HashExpr>() {
Copy link
Contributor

@Dandandan Dandandan Jan 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this always the case? We can remove the hashexpr (only store the inner expressions) while it is constructed to simplify the code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a great point. Storing the inner expressions directly is indeed a better approach, especially since the perfect hash join(which is being worked on in that separate pending PR) needs direct access to on_columns to identify the join key columns.

I initially considered changing HashTableLookupExpr.hash_expr to a concrete HashExpr type (which I've actually already done in another unmerged PR for perfect hash join). However, I think that accessing the columns through hash_expr.on_columns feels a bit clunky 😂.

@github-actions github-actions bot added the proto Related to proto crate label Jan 4, 2026
@adriangb
Copy link
Contributor

adriangb commented Jan 4, 2026

It might be interesting to re-run https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/#hash-join-dynamic-filters and see if the numbers are even better now!

@UBarney
Copy link
Contributor Author

UBarney commented Jan 5, 2026

It might be interesting to re-run https://datafusion.apache.org/blog/2025/09/10/dynamic-filters/#hash-join-dynamic-filters and see if the numbers are even better now!

I re-ran the benchmark(SELECT * FROM small_table JOIN large_table ON small_table.k = large_table.k WHERE small_table.v >= 50;), but the execution time remains almost unchanged at around 7ms. It seems the min-max filter (k@0 >= 50 AND k@0 <= 1000) is already filtering out the majority of the data, which is likely why we don't see a significant difference.
@adriangb

@UBarney UBarney mentioned this pull request Jan 6, 2026
@Dandandan Dandandan added this pull request to the merge queue Jan 6, 2026
Merged via the queue into apache:main with commit 5c2ee36 Jan 6, 2026
32 checks passed
@Dandandan
Copy link
Contributor

Thank you @UBarney!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate proto Related to proto crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants