Skip to content

Commit

Permalink
fix: Remove support for evaluating sub-optimial filters in joins
Browse files Browse the repository at this point in the history
Summary:
This enables the hash join operator to throw an error when a filter is applied exclusively
to columns from one side of the join. This validation ensures the detection of
suboptimal plans, where filters are evaluated at the join operator instead of being
pushed to the upstream scan node of the respective join side for optimization.

Also adds a private API to ensure an internal state variable which is a buffer is
unique before any mutations are applied to it.

Differential Revision: D69085909
  • Loading branch information
Bikramjeet Vig authored and facebook-github-bot committed Feb 4, 2025
1 parent 2bb7aab commit e5bc251
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 6 deletions.
16 changes: 11 additions & 5 deletions velox/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ void HashProbe::initializeFilter(
filter_ =
std::make_unique<ExprSet>(std::move(filters), operatorCtx_->execCtx());

bool filterOnProbeColumns = false;
bool filterOnBuildColumns = false;
column_index_t filterChannel = 0;
std::vector<std::string> names;
std::vector<TypePtr> types;
Expand All @@ -216,6 +218,7 @@ void HashProbe::initializeFilter(
const auto& name = field->field();
auto channel = probeType->getChildIdxIfExists(name);
if (channel.has_value()) {
filterOnProbeColumns = true;
auto channelValue = channel.value();
filterInputProjections_.emplace_back(channelValue, filterChannel++);
names.emplace_back(probeType->nameOf(channelValue));
Expand All @@ -224,6 +227,7 @@ void HashProbe::initializeFilter(
}
channel = tableType->getChildIdxIfExists(name);
if (channel.has_value()) {
filterOnBuildColumns = true;
auto channelValue = channel.value();
filterTableProjections_.emplace_back(channelValue, filterChannel);
names.emplace_back(tableType->nameOf(channelValue));
Expand All @@ -234,6 +238,10 @@ void HashProbe::initializeFilter(
VELOX_FAIL(
"Join filter field {} not in probe or build input", field->toString());
}
VELOX_CHECK_EQ(
filterOnBuildColumns,
filterOnProbeColumns,
"Filter should be on both probe and build columns otherwise it should be pushed down.");

filterInputType_ = ROW(std::move(names), std::move(types));
}
Expand Down Expand Up @@ -1248,7 +1256,7 @@ void HashProbe::prepareFilterRowsForNullAwareJoin(
// with null join key columns(s) as we can apply filtering after they cross
// join with the table rows later.
if (!nonNullInputRows_.isAllSelected()) {
auto* rawMapping = outputRowMapping_->asMutable<vector_size_t>();
auto* rawMapping = getRawMutableOuputRowMapping();
for (int i = 0; i < numRows; ++i) {
if (filterInputRows_.isValid(i) &&
!nonNullInputRows_.isValid(rawMapping[i])) {
Expand Down Expand Up @@ -1339,8 +1347,7 @@ void HashProbe::applyFilterOnTableRowsForNullAwareJoin(
SelectivityVector HashProbe::evalFilterForNullAwareJoin(
vector_size_t numRows,
bool filterPropagateNulls) {
auto* rawOutputProbeRowMapping =
outputRowMapping_->asMutable<vector_size_t>();
auto* rawOutputProbeRowMapping = getRawMutableOuputRowMapping();

// Subset of probe-side rows with a match that passed the filter.
SelectivityVector filterPassedRows(input_->size(), false);
Expand Down Expand Up @@ -1398,8 +1405,6 @@ int32_t HashProbe::evalFilter(int32_t numRows) {
}

const bool filterPropagateNulls = filter_->expr(0)->propagatesNulls();
auto* rawOutputProbeRowMapping =
outputRowMapping_->asMutable<vector_size_t>();
auto* outputTableRows = outputTableRows_->asMutable<char*>();

filterInputRows_.resizeFill(numRows);
Expand Down Expand Up @@ -1431,6 +1436,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) {
decodedFilterResult_.decode(*filterResult_[0], filterInputRows_);

int32_t numPassed = 0;
auto* rawOutputProbeRowMapping = getRawMutableOuputRowMapping();
if (isLeftJoin(joinType_) || isFullJoin(joinType_)) {
// Identify probe rows which got filtered out and add them back with nulls
// for build side.
Expand Down
15 changes: 14 additions & 1 deletion velox/exec/HashProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,11 @@ class HashProbe : public Operator {
// the hash table.
void asyncWaitForHashTable();

// Sets up 'filter_' and related members.
// Sets up 'filter_' and related members. Throws if filter is applied
// exclusively to columns from one side of the join to ensure the detection
// of suboptimal plans, where filters are evaluated at the join operator
// instead of being pushed to the upstream scan node of the respective join
// side for optimization.
void initializeFilter(
const core::TypedExprPtr& filter,
const RowTypePtr& probeType,
Expand Down Expand Up @@ -327,6 +331,15 @@ class HashProbe : public Operator {
// of an invalidated table as we always spill the entire table.
std::optional<RowColumn::Stats> columnStats(int32_t columnIndex) const;

// Returns the raw pointer to the output row mapping buffer.
// Throws exception if the buffer is not mutable.
// Note: Use this method close to the block that modifies the buffer to ensure
// mutability does not change in-between.
vector_size_t* getRawMutableOuputRowMapping() {
VELOX_CHECK(outputRowMapping_->isMutable());
return outputRowMapping_->asMutable<vector_size_t>();
}

// TODO: Define batch size as bytes based on RowContainer row sizes.
const vector_size_t outputBatchSize_;

Expand Down
29 changes: 29 additions & 0 deletions velox/exec/tests/HashJoinTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3294,6 +3294,35 @@ TEST_P(MultiThreadedHashJoinTest, noSpillLevelLimit) {
.run();
}

TEST_F(HashJoinTest, filterOnASingleSide) {
// Verify that the hash join operator throws an error when a filter is applied
// exclusively to columns from one side of the join. This validation ensures
// the detection of suboptimal plans, where filters are evaluated at the join
// operator instead of being pushed to the upstream scan node of the
// respective join side for optimization.
auto planNodeIdGenerator = std::make_shared<core::PlanNodeIdGenerator>();
auto plan = PlanBuilder(planNodeIdGenerator)
.tableScan(ROW({"t0", "t1"}, {INTEGER(), BIGINT()}))
.hashJoin(
{"t0"},
{"u0"},
PlanBuilder(planNodeIdGenerator)
.tableScan(ROW({"u0", "u1"}, {INTEGER(), BIGINT()}))
.planNode(),
"t0 = 1",
{"t0", "u0"},
core::JoinType::kFull)
.planNode();
VELOX_ASSERT_THROW(
HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get())
.numDrivers(numDrivers_)
.planNode(plan)
.referenceQuery("SELECT t0, u0 FROM t, u WHERE t0 = 1")
.injectSpill(false)
.run(),
"Filter should be on both probe and build columns otherwise it should be pushed down.");
}

// Verify that dynamic filter pushed down is turned off for null-aware right
// semi project join.
TEST_F(HashJoinTest, nullAwareRightSemiProjectOverScan) {
Expand Down

0 comments on commit e5bc251

Please sign in to comment.