diff --git a/bolt/core/PlanNode.h b/bolt/core/PlanNode.h index 811b365e..ea463734 100644 --- a/bolt/core/PlanNode.h +++ b/bolt/core/PlanNode.h @@ -1695,7 +1695,8 @@ class HashJoinNode : public AbstractJoinNode { TypedExprPtr filter, PlanNodePtr left, PlanNodePtr right, - RowTypePtr outputType) + RowTypePtr outputType, + void* reusedHashTableAddress = nullptr) : AbstractJoinNode( id, joinType, @@ -1705,7 +1706,8 @@ class HashJoinNode : public AbstractJoinNode { std::move(left), std::move(right), std::move(outputType)), - nullAware_{nullAware} { + nullAware_{nullAware}, + reusedHashTableAddress_(reusedHashTableAddress) { if (nullAware) { BOLT_USER_CHECK( isNullAwareSupported(joinType), @@ -1764,6 +1766,10 @@ class HashJoinNode : public AbstractJoinNode { return nullAware_; } + void* reusedHashTableAddress() const { + return reusedHashTableAddress_; + } + folly::dynamic serialize() const override; static PlanNodePtr create(const folly::dynamic& obj, void* context); @@ -1772,6 +1778,8 @@ class HashJoinNode : public AbstractJoinNode { void addDetails(std::stringstream& stream) const override; const bool nullAware_; + + void* reusedHashTableAddress_; }; /// Represents inner/outer/semi/anti merge joins. Translates to an diff --git a/bolt/exec/HashBuild.cpp b/bolt/exec/HashBuild.cpp index 2ad5df41..9452e6c5 100644 --- a/bolt/exec/HashBuild.cpp +++ b/bolt/exec/HashBuild.cpp @@ -103,7 +103,8 @@ HashBuild::HashBuild( driverCtx->queryConfig().skewRowCountRatioThreshold()), isDREnabled_(operatorCtx_->driverCtx() ->queryConfig() - .isDataRetentionUpdateEnabled()) { + .isDataRetentionUpdateEnabled()), + reusedHashTableAddress_(joinNode_->reusedHashTableAddress()) { BOLT_CHECK(pool()->trackUsage()); BOLT_CHECK_NOT_NULL(joinBridge_); @@ -117,60 +118,95 @@ HashBuild::HashBuild( } joinBridge_->addBuilder(); - auto inputType = joinNode_->sources()[1]->outputType(); - - const auto numKeys = joinNode_->rightKeys().size(); - keyChannels_.reserve(numKeys); - std::vector names; - names.reserve(inputType->size()); - std::vector types; - types.reserve(inputType->size()); - - for (int i = 0; i < numKeys; ++i) { - auto& key = joinNode_->rightKeys()[i]; - auto channel = exprToChannel(key.get(), inputType); - keyChannelMap_[channel] = i; - keyChannels_.emplace_back(channel); - names.emplace_back(inputType->nameOf(channel)); - types.emplace_back(inputType->childAt(channel)); - } - - maxHashTableBucketCount_ = - operatorCtx_->driverCtx()->queryConfig().maxHashTableSize(); - BOLT_CHECK(maxHashTableBucketCount_ > 1); - - dropDuplicates_ = canDropDuplicates(joinNode_); - addRuntimeStat( - "triggerDeduplicateInHashBuild", RuntimeCounter(dropDuplicates_ ? 1 : 0)); - - // Identify the non-key build side columns and make a decoder for each. - const int32_t numDependents = inputType->size() - numKeys; - if (!dropDuplicates_ && numDependents > 0) { - // Number of join keys (numKeys) may be less then number of input columns - // (inputType->size()). In this case numDependents is negative and cannot be - // used to call 'reserve'. This happens when we join different probe side - // keys with the same build side key: SELECT * FROM t LEFT JOIN u ON t.k1 = - // u.k AND t.k2 = u.k. - dependentChannels_.reserve(numDependents); - decoders_.reserve(numDependents); - } - if (!dropDuplicates_) { - // For left semi and anti join with no extra filter, hash table does not - // store dependent columns. - for (auto i = 0; i < inputType->size(); ++i) { - if (keyChannelMap_.find(i) == keyChannelMap_.end()) { - dependentChannels_.emplace_back(i); - decoders_.emplace_back(std::make_unique()); - names.emplace_back(inputType->nameOf(i)); - types.emplace_back(inputType->childAt(i)); + if (reusedHashTableAddress_ != nullptr) { + auto baseHashTable = + reinterpret_cast(reusedHashTableAddress_); + + BOLT_CHECK_NOT_NULL(joinBridge_); + joinBridge_->start(); + + if (baseHashTable->joinHasNullKeys() && isAntiJoin(joinType_) && + nullAware_ && !joinNode_->filter()) { + joinBridge_->setAntiJoinHasNullKeys(); + } else { + baseHashTable->prepareJoinTable({}); + + BOLT_CHECK_NOT_NULL(joinBridge_); + std::unique_ptr< + exec::BaseHashTable, + std::function> + hashTable(nullptr, [](exec::BaseHashTable* ptr) { /* Do nothing */ }); + + if (auto hasheTableWithNullKeys = + dynamic_cast*>(baseHashTable)) { + hashTable.reset(hasheTableWithNullKeys); + } else if ( + auto hasheTableWithoutNullKeys = + dynamic_cast*>(baseHashTable)) { + hashTable.reset(hasheTableWithoutNullKeys); + } else { + BOLT_UNREACHABLE("Unexpected HashTable {}", baseHashTable->toString()); } + + auto joinHashNullKeys = hashTable->joinHasNullKeys(); + joinBridge_->setHashTable(std::move(hashTable), joinHashNullKeys); + } + } else { + auto inputType = joinNode_->sources()[1]->outputType(); + + const auto numKeys = joinNode_->rightKeys().size(); + keyChannels_.reserve(numKeys); + std::vector names; + names.reserve(inputType->size()); + std::vector types; + types.reserve(inputType->size()); + + for (int i = 0; i < numKeys; ++i) { + auto& key = joinNode_->rightKeys()[i]; + auto channel = exprToChannel(key.get(), inputType); + keyChannelMap_[channel] = i; + keyChannels_.emplace_back(channel); + names.emplace_back(inputType->nameOf(channel)); + types.emplace_back(inputType->childAt(channel)); } - } - tableType_ = ROW(std::move(names), std::move(types)); - setupTable(); - setupSpiller(); - intermediateStateCleared_ = false; + maxHashTableBucketCount_ = + operatorCtx_->driverCtx()->queryConfig().maxHashTableSize(); + BOLT_CHECK(maxHashTableBucketCount_ > 1); + + dropDuplicates_ = canDropDuplicates(joinNode_); + addRuntimeStat( + "triggerDeduplicateInHashBuild", + RuntimeCounter(dropDuplicates_ ? 1 : 0)); + + // Identify the non-key build side columns and make a decoder for each. + const int32_t numDependents = inputType->size() - numKeys; + if (!dropDuplicates_ && numDependents > 0) { + // Number of join keys (numKeys) may be less then number of input columns + // (inputType->size()). In this case numDependents is negative and cannot + // be used to call 'reserve'. This happens when we join different probe + // side keys with the same build side key: SELECT * FROM t LEFT JOIN u ON + // t.k1 = u.k AND t.k2 = u.k. + dependentChannels_.reserve(numDependents); + decoders_.reserve(numDependents); + } + if (!dropDuplicates_) { + // For left semi and anti join with no extra filter, hash table does not + // store dependent columns. + for (auto i = 0; i < inputType->size(); ++i) { + if (keyChannelMap_.find(i) == keyChannelMap_.end()) { + dependentChannels_.emplace_back(i); + decoders_.emplace_back(std::make_unique()); + names.emplace_back(inputType->nameOf(i)); + types.emplace_back(inputType->childAt(i)); + } + } + } + tableType_ = ROW(std::move(names), std::move(types)); + setupTable(); + setupSpiller(); + intermediateStateCleared_ = false; + } LOG(INFO) << name() << " HashBuild created for " << operatorCtx_->toString() << ", spill enabled: " << spillEnabled() @@ -911,6 +947,10 @@ void HashBuild::addAndClearSpillTarget(uint64_t& numRows, uint64_t& numBytes) { } void HashBuild::noMoreInput() { + if (reusedHashTableAddress_ != nullptr) { + return; + } + checkRunning(); if (noMoreInput_) { @@ -1371,6 +1411,9 @@ BlockingReason HashBuild::isBlocked(ContinueFuture* future) { } bool HashBuild::isFinished() { + if (reusedHashTableAddress_ != nullptr) { + return true; + } return state_ == State::kFinish; } diff --git a/bolt/exec/HashBuild.h b/bolt/exec/HashBuild.h index 4c01866d..81ae7a67 100644 --- a/bolt/exec/HashBuild.h +++ b/bolt/exec/HashBuild.h @@ -97,6 +97,9 @@ class HashBuild final : public Operator { } bool needsInput() const override { + if (reusedHashTableAddress_ != nullptr) { + return false; + } return !noMoreInput_; } @@ -447,6 +450,8 @@ class HashBuild final : public Operator { bool isDREnabled_{false}; int32_t maxHashTableBucketCount_{std::numeric_limits::max()}; std::shared_ptr rowFormatInfo_{nullptr}; + + void* reusedHashTableAddress_; }; inline std::ostream& operator<<(std::ostream& os, HashBuild::State state) { diff --git a/bolt/exec/HashJoinBridge.cpp b/bolt/exec/HashJoinBridge.cpp index 6d746405..efc090f3 100644 --- a/bolt/exec/HashJoinBridge.cpp +++ b/bolt/exec/HashJoinBridge.cpp @@ -97,6 +97,24 @@ bool HashJoinBridge::setHashTable( return hasSpillData; } +void HashJoinBridge::setHashTable( + std::shared_ptr table, + bool hasNullKeys) { + BOLT_CHECK_NOT_NULL(table, "setHashTable called with null table"); + + std::vector promises; + { + std::lock_guard l(mutex_); + BOLT_CHECK(started_); + BOLT_CHECK(!buildResult_.has_value()); + BOLT_CHECK(restoringSpillShards_.empty()); + + buildResult_ = HashBuildResult(std::move(table), hasNullKeys); + promises = std::move(promises_); + } + notify(std::move(promises)); +} + void HashJoinBridge::setAntiJoinHasNullKeys() { std::vector promises; SpillPartitionSet spillPartitions; diff --git a/bolt/exec/HashJoinBridge.h b/bolt/exec/HashJoinBridge.h index cf2f8b71..e37f0b0f 100644 --- a/bolt/exec/HashJoinBridge.h +++ b/bolt/exec/HashJoinBridge.h @@ -60,6 +60,8 @@ class HashJoinBridge : public JoinBridge { bool hasNullKeys, SpillOffsetToBitsSet offsetToJoinBits = nullptr); + void setHashTable(std::shared_ptr table, bool hasNullKeys); + void setAntiJoinHasNullKeys(); /// Represents the result of HashBuild operators: a hash table, an optional @@ -83,6 +85,9 @@ class HashJoinBridge : public JoinBridge { HashBuildResult() : hasNullKeys(true) {} + HashBuildResult(std::shared_ptr _table, bool _hasNullKeys) + : hasNullKeys(_hasNullKeys), table(std::move(_table)) {} + bool hasNullKeys; std::shared_ptr table; std::optional restoredPartitionId; diff --git a/bolt/exec/HashProbe.cpp b/bolt/exec/HashProbe.cpp index 197afb87..89b66bdc 100644 --- a/bolt/exec/HashProbe.cpp +++ b/bolt/exec/HashProbe.cpp @@ -452,6 +452,8 @@ void HashProbe::prepareForSpillRestore() { // Reset the internal states which are relevant to the previous probe run. noMoreSpillInput_ = false; + // bolt current doesn't have HashProbe Spill function + // https://github.com/facebookincubator/velox/commit/2ea66c6204d8f1b27f7682111b335bd4de8ef6fc#diff-9a2573a9b3648b92c3e9342bfcb69ab4748e85e1bb18eea7cd08513e9911b532 table_.reset(); spiller_.reset(); if (!reuseSpillReader_) { diff --git a/bolt/exec/HashTable.cpp b/bolt/exec/HashTable.cpp index 10a17207..8a985401 100644 --- a/bolt/exec/HashTable.cpp +++ b/bolt/exec/HashTable.cpp @@ -28,13 +28,15 @@ * -------------------------------------------------------------------------- */ +#include #include #include #include +#include #include +#include #include -#include #include "bolt/common/base/AsyncSource.h" #include "bolt/common/base/Exceptions.h" #include "bolt/common/base/Portability.h" @@ -76,13 +78,15 @@ HashTable::HashTable( uint32_t minTableSizeForParallelJoinBuild, memory::MemoryPool* pool, const std::shared_ptr& stringArena, - bool enableJit) + bool enableJit, + bool reused) : BaseHashTable(std::move(hashers)), minTableSizeForParallelJoinBuild_(minTableSizeForParallelJoinBuild), isJoinBuild_(isJoinBuild), joinBuildNoDuplicates_(!allowDuplicates), hashMode_(mode), - enableJit_(enableJit) { + enableJit_(enableJit), + reused_(reused) { std::vector keys; for (auto& hasher : hashers_) { keys.push_back(hasher->type()); @@ -129,7 +133,8 @@ HashTable::HashTable( uint32_t minTableSizeForParallelJoinBuild, memory::MemoryPool* pool, const std::shared_ptr& stringArena, - bool enableJitRowEqVectors) + bool enableJitRowEqVectors, + bool reused) : HashTable( std::move(hashers), accumulators, @@ -141,7 +146,8 @@ HashTable::HashTable( minTableSizeForParallelJoinBuild, pool, stringArena, - enableJitRowEqVectors) {} + enableJitRowEqVectors, + reused) {} int32_t ProbeState::row() const { return row_; @@ -1781,77 +1787,81 @@ void HashTable::prepareJoinTable( folly::Executor* executor, bool dropDuplicates, int8_t spillInputStartPartitionBit) { - buildExecutor_ = executor; - if (dropDuplicates) { - if (table_ != nullptr) { - // Set table_ to nullptr to trigger rehash. - rows_->pool()->freeContiguous(tableAllocation_); - table_ = nullptr; - capacity_ = 0; - numDistinct_ = rows()->numRows(); - } - // Call analyze to insert all unique values in row container to the - // table hashers' uniqueValues_; - bool analyzeValue = !analyze(); - if (analyzeValue) { - if (hashMode_ != HashMode::kHash) { - setHashMode(HashMode::kHash, 0); - } else { - checkSize(0, true); + std::lock_guard l(mutex_); + if (!prepared_) { + buildExecutor_ = executor; + if (dropDuplicates) { + if (table_ != nullptr) { + // Set table_ to nullptr to trigger rehash. + rows_->pool()->freeContiguous(tableAllocation_); + table_ = nullptr; + capacity_ = 0; + numDistinct_ = rows()->numRows(); + } + // Call analyze to insert all unique values in row container to the + // table hashers' uniqueValues_; + bool analyzeValue = !analyze(); + if (analyzeValue) { + if (hashMode_ != HashMode::kHash) { + setHashMode(HashMode::kHash, 0); + } else { + checkSize(0, true); + } } } - } - otherTables_.reserve(tables.size()); - for (auto& table : tables) { - otherTables_.emplace_back(std::unique_ptr>( - dynamic_cast*>(table.release()))); - } - bool useValueIds = mayUseValueIds(*this); - if (useValueIds) { - for (auto& other : otherTables_) { - if (!mayUseValueIds(*other)) { - useValueIds = false; - break; - } + otherTables_.reserve(tables.size()); + for (auto& table : tables) { + otherTables_.emplace_back(std::unique_ptr>( + dynamic_cast*>(table.release()))); } + bool useValueIds = mayUseValueIds(*this); if (useValueIds) { for (auto& other : otherTables_) { - if (dropDuplicates) { - // Before merging with the current hashers, all values in the row - // containers of other table need to be inserted into uniqueValues_. - if (!other->analyze()) { - other->setHashMode(HashMode::kHash, 0); - useValueIds = false; - break; - } + if (!mayUseValueIds(*other)) { + useValueIds = false; + break; } - for (auto i = 0; i < hashers_.size(); ++i) { - hashers_[i]->merge(*other->hashers_[i]); - if (!hashers_[i]->mayUseValueIds()) { - useValueIds = false; + } + if (useValueIds) { + for (auto& other : otherTables_) { + if (dropDuplicates) { + // Before merging with the current hashers, all values in the row + // containers of other table need to be inserted into uniqueValues_. + if (!other->analyze()) { + other->setHashMode(HashMode::kHash, 0); + useValueIds = false; + break; + } + } + for (auto i = 0; i < hashers_.size(); ++i) { + hashers_[i]->merge(*other->hashers_[i]); + if (!hashers_[i]->mayUseValueIds()) { + useValueIds = false; + break; + } + } + if (!useValueIds) { break; } } - if (!useValueIds) { - break; - } } } - } - numDistinct_ = rows()->numRows(); + numDistinct_ = rows()->numRows(); - for (const auto& other : otherTables_) { - numDistinct_ += other->rows()->numRows(); - } - if (!useValueIds) { - if (hashMode_ != HashMode::kHash) { - setHashMode(HashMode::kHash, 0); + for (const auto& other : otherTables_) { + numDistinct_ += other->rows()->numRows(); + } + if (!useValueIds) { + if (hashMode_ != HashMode::kHash) { + setHashMode(HashMode::kHash, 0); + } else { + checkSize(0, true); + } } else { - checkSize(0, true); + decideHashMode(0); } - } else { - decideHashMode(0); + prepared_ = true; } checkHashBitsOverlap(spillInputStartPartitionBit); LOG(INFO) << __FUNCTION__ << ": capacity_ = " << capacity_ diff --git a/bolt/exec/HashTable.h b/bolt/exec/HashTable.h index 17ab5407..dbd27aa2 100644 --- a/bolt/exec/HashTable.h +++ b/bolt/exec/HashTable.h @@ -31,6 +31,8 @@ #pragma once #include +#include + #include "bolt/common/base/Portability.h" #include "bolt/exec/OneWayStatusFlag.h" #include "bolt/exec/RowContainer.h" @@ -329,6 +331,12 @@ class BaseHashTable { /// side. This is used for sizing the internal hash table. virtual uint64_t numDistinct() const = 0; + virtual bool reused() const = 0; + + virtual bool joinHasNullKeys() const = 0; + + virtual void setJoinHasNullKeys() = 0; + virtual float getDistinctRatio() const = 0; /// Return a number of current stats that can help with debugging and @@ -542,7 +550,8 @@ class HashTable : public BaseHashTable { uint32_t minTableSizeForParallelJoinBuild, memory::MemoryPool* pool, const std::shared_ptr& stringArena, - bool enableJitRowEqVectors); + bool enableJitRowEqVectors, + bool reused = false); HashTable( std::vector>&& hashers, @@ -555,7 +564,8 @@ class HashTable : public BaseHashTable { uint32_t minTableSizeForParallelJoinBuild, memory::MemoryPool* pool, const std::shared_ptr& stringArena, - bool enableJitRowEqVectors); + bool enableJitRowEqVectors, + bool reused = false); ~HashTable() override = default; @@ -586,7 +596,8 @@ class HashTable : public BaseHashTable { HashMode mode, uint32_t minTableSizeForParallelJoinBuild, memory::MemoryPool* pool, - bool jitRowEqVectors) { + bool jitRowEqVectors, + bool reused = false) { return std::make_unique( std::move(hashers), std::vector{}, @@ -598,7 +609,8 @@ class HashTable : public BaseHashTable { minTableSizeForParallelJoinBuild, pool, nullptr, - jitRowEqVectors); + jitRowEqVectors, + reused); } void groupProbe(HashLookup& lookup) override; @@ -669,6 +681,18 @@ class HashTable : public BaseHashTable { return numDistinct_; } + bool reused() const override { + return reused_; + } + + bool joinHasNullKeys() const override { + return joinHasNullKeys_; + } + + void setJoinHasNullKeys() override { + joinHasNullKeys_ = true; + } + float getDistinctRatio() const override { return numProbeInputs_ == 0 ? 0 : numDistinct_ * 1.0 / numProbeInputs_; } @@ -1157,8 +1181,17 @@ class HashTable : public BaseHashTable { #ifdef ENABLE_BOLT_JIT bolt::jit::CompiledModuleSP jitModule_; bolt::jit::CompiledModuleSP jitModuleRow_; - #endif + + std::mutex mutex_; + + bool prepared_{false}; + + bool reused_{false}; + + // True if this is a build side of an anti or left semi project join and has + // at least one entry with null join keys. + bool joinHasNullKeys_{false}; }; } // namespace exec