diff --git a/velox/exec/HashProbe.cpp b/velox/exec/HashProbe.cpp index c8454da3a06f..0f59d6c28aa1 100644 --- a/velox/exec/HashProbe.cpp +++ b/velox/exec/HashProbe.cpp @@ -206,6 +206,8 @@ void HashProbe::initializeFilter( filter_ = std::make_unique(std::move(filters), operatorCtx_->execCtx()); + bool filterOnProbeColumns = false; + bool filterOnBuildColumns = false; column_index_t filterChannel = 0; std::vector names; std::vector types; @@ -216,6 +218,7 @@ void HashProbe::initializeFilter( const auto& name = field->field(); auto channel = probeType->getChildIdxIfExists(name); if (channel.has_value()) { + filterOnProbeColumns = true; auto channelValue = channel.value(); filterInputProjections_.emplace_back(channelValue, filterChannel++); names.emplace_back(probeType->nameOf(channelValue)); @@ -224,6 +227,7 @@ void HashProbe::initializeFilter( } channel = tableType->getChildIdxIfExists(name); if (channel.has_value()) { + filterOnBuildColumns = true; auto channelValue = channel.value(); filterTableProjections_.emplace_back(channelValue, filterChannel); names.emplace_back(tableType->nameOf(channelValue)); @@ -234,6 +238,10 @@ void HashProbe::initializeFilter( VELOX_FAIL( "Join filter field {} not in probe or build input", field->toString()); } + VELOX_CHECK_EQ( + filterOnBuildColumns, + filterOnProbeColumns, + "Filter should be on both probe and build columns otherwise it should be pushed down."); filterInputType_ = ROW(std::move(names), std::move(types)); } @@ -1248,7 +1256,7 @@ void HashProbe::prepareFilterRowsForNullAwareJoin( // with null join key columns(s) as we can apply filtering after they cross // join with the table rows later. if (!nonNullInputRows_.isAllSelected()) { - auto* rawMapping = outputRowMapping_->asMutable(); + auto* rawMapping = getRawMutableOuputRowMapping(); for (int i = 0; i < numRows; ++i) { if (filterInputRows_.isValid(i) && !nonNullInputRows_.isValid(rawMapping[i])) { @@ -1339,8 +1347,7 @@ void HashProbe::applyFilterOnTableRowsForNullAwareJoin( SelectivityVector HashProbe::evalFilterForNullAwareJoin( vector_size_t numRows, bool filterPropagateNulls) { - auto* rawOutputProbeRowMapping = - outputRowMapping_->asMutable(); + auto* rawOutputProbeRowMapping = getRawMutableOuputRowMapping(); // Subset of probe-side rows with a match that passed the filter. SelectivityVector filterPassedRows(input_->size(), false); @@ -1398,8 +1405,6 @@ int32_t HashProbe::evalFilter(int32_t numRows) { } const bool filterPropagateNulls = filter_->expr(0)->propagatesNulls(); - auto* rawOutputProbeRowMapping = - outputRowMapping_->asMutable(); auto* outputTableRows = outputTableRows_->asMutable(); filterInputRows_.resizeFill(numRows); @@ -1431,6 +1436,7 @@ int32_t HashProbe::evalFilter(int32_t numRows) { decodedFilterResult_.decode(*filterResult_[0], filterInputRows_); int32_t numPassed = 0; + auto* rawOutputProbeRowMapping = getRawMutableOuputRowMapping(); if (isLeftJoin(joinType_) || isFullJoin(joinType_)) { // Identify probe rows which got filtered out and add them back with nulls // for build side. diff --git a/velox/exec/HashProbe.h b/velox/exec/HashProbe.h index eef2b802a49c..e808150e3b60 100644 --- a/velox/exec/HashProbe.h +++ b/velox/exec/HashProbe.h @@ -101,7 +101,11 @@ class HashProbe : public Operator { // the hash table. void asyncWaitForHashTable(); - // Sets up 'filter_' and related members. + // Sets up 'filter_' and related members. Throws if filter is applied + // exclusively to columns from one side of the join to ensure the detection + // of suboptimal plans, where filters are evaluated at the join operator + // instead of being pushed to the upstream scan node of the respective join + // side for optimization. void initializeFilter( const core::TypedExprPtr& filter, const RowTypePtr& probeType, @@ -327,6 +331,15 @@ class HashProbe : public Operator { // of an invalidated table as we always spill the entire table. std::optional columnStats(int32_t columnIndex) const; + // Returns the raw pointer to the output row mapping buffer. + // Throws exception if the buffer is not mutable. + // Note: Use this method close to the block that modifies the buffer to ensure + // mutability does not change in-between. + vector_size_t* getRawMutableOuputRowMapping() { + VELOX_CHECK(outputRowMapping_->isMutable()); + return outputRowMapping_->asMutable(); + } + // TODO: Define batch size as bytes based on RowContainer row sizes. const vector_size_t outputBatchSize_; diff --git a/velox/exec/tests/HashJoinTest.cpp b/velox/exec/tests/HashJoinTest.cpp index 650ef294bae2..e61c475d1ed9 100644 --- a/velox/exec/tests/HashJoinTest.cpp +++ b/velox/exec/tests/HashJoinTest.cpp @@ -3294,6 +3294,35 @@ TEST_P(MultiThreadedHashJoinTest, noSpillLevelLimit) { .run(); } +TEST_F(HashJoinTest, filterOnASingleSide) { + // Verify that the hash join operator throws an error when a filter is applied + // exclusively to columns from one side of the join. This validation ensures + // the detection of suboptimal plans, where filters are evaluated at the join + // operator instead of being pushed to the upstream scan node of the + // respective join side for optimization. + auto planNodeIdGenerator = std::make_shared(); + auto plan = PlanBuilder(planNodeIdGenerator) + .tableScan(ROW({"t0", "t1"}, {INTEGER(), BIGINT()})) + .hashJoin( + {"t0"}, + {"u0"}, + PlanBuilder(planNodeIdGenerator) + .tableScan(ROW({"u0", "u1"}, {INTEGER(), BIGINT()})) + .planNode(), + "t0 = 1", + {"t0", "u0"}, + core::JoinType::kFull) + .planNode(); + VELOX_ASSERT_THROW( + HashJoinBuilder(*pool_, duckDbQueryRunner_, driverExecutor_.get()) + .numDrivers(numDrivers_) + .planNode(plan) + .referenceQuery("SELECT t0, u0 FROM t, u WHERE t0 = 1") + .injectSpill(false) + .run(), + "Filter should be on both probe and build columns otherwise it should be pushed down."); +} + // Verify that dynamic filter pushed down is turned off for null-aware right // semi project join. TEST_F(HashJoinTest, nullAwareRightSemiProjectOverScan) {