Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions bolt/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -1695,7 +1695,8 @@ class HashJoinNode : public AbstractJoinNode {
TypedExprPtr filter,
PlanNodePtr left,
PlanNodePtr right,
RowTypePtr outputType)
RowTypePtr outputType,
void* reusedHashTableAddress = nullptr)
: AbstractJoinNode(
id,
joinType,
Expand All @@ -1705,7 +1706,8 @@ class HashJoinNode : public AbstractJoinNode {
std::move(left),
std::move(right),
std::move(outputType)),
nullAware_{nullAware} {
nullAware_{nullAware},
reusedHashTableAddress_(reusedHashTableAddress) {
if (nullAware) {
BOLT_USER_CHECK(
isNullAwareSupported(joinType),
Expand Down Expand Up @@ -1764,6 +1766,10 @@ class HashJoinNode : public AbstractJoinNode {
return nullAware_;
}

void* reusedHashTableAddress() const {
return reusedHashTableAddress_;
}

folly::dynamic serialize() const override;

static PlanNodePtr create(const folly::dynamic& obj, void* context);
Expand All @@ -1772,6 +1778,8 @@ class HashJoinNode : public AbstractJoinNode {
void addDetails(std::stringstream& stream) const override;

const bool nullAware_;

void* reusedHashTableAddress_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will made HashJoinNode cannot be serialized acrossβ€Œ process and machines. So better to add some comment or CHECK

};

/// Represents inner/outer/semi/anti merge joins. Translates to an
Expand Down
147 changes: 95 additions & 52 deletions bolt/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ HashBuild::HashBuild(
driverCtx->queryConfig().skewRowCountRatioThreshold()),
isDREnabled_(operatorCtx_->driverCtx()
->queryConfig()
.isDataRetentionUpdateEnabled()) {
.isDataRetentionUpdateEnabled()),
reusedHashTableAddress_(joinNode_->reusedHashTableAddress()) {
BOLT_CHECK(pool()->trackUsage());
BOLT_CHECK_NOT_NULL(joinBridge_);

Expand All @@ -117,60 +118,95 @@ HashBuild::HashBuild(
}
joinBridge_->addBuilder();

auto inputType = joinNode_->sources()[1]->outputType();

const auto numKeys = joinNode_->rightKeys().size();
keyChannels_.reserve(numKeys);
std::vector<std::string> names;
names.reserve(inputType->size());
std::vector<TypePtr> types;
types.reserve(inputType->size());

for (int i = 0; i < numKeys; ++i) {
auto& key = joinNode_->rightKeys()[i];
auto channel = exprToChannel(key.get(), inputType);
keyChannelMap_[channel] = i;
keyChannels_.emplace_back(channel);
names.emplace_back(inputType->nameOf(channel));
types.emplace_back(inputType->childAt(channel));
}

maxHashTableBucketCount_ =
operatorCtx_->driverCtx()->queryConfig().maxHashTableSize();
BOLT_CHECK(maxHashTableBucketCount_ > 1);

dropDuplicates_ = canDropDuplicates(joinNode_);
addRuntimeStat(
"triggerDeduplicateInHashBuild", RuntimeCounter(dropDuplicates_ ? 1 : 0));

// Identify the non-key build side columns and make a decoder for each.
const int32_t numDependents = inputType->size() - numKeys;
if (!dropDuplicates_ && numDependents > 0) {
// Number of join keys (numKeys) may be less then number of input columns
// (inputType->size()). In this case numDependents is negative and cannot be
// used to call 'reserve'. This happens when we join different probe side
// keys with the same build side key: SELECT * FROM t LEFT JOIN u ON t.k1 =
// u.k AND t.k2 = u.k.
dependentChannels_.reserve(numDependents);
decoders_.reserve(numDependents);
}
if (!dropDuplicates_) {
// For left semi and anti join with no extra filter, hash table does not
// store dependent columns.
for (auto i = 0; i < inputType->size(); ++i) {
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
dependentChannels_.emplace_back(i);
decoders_.emplace_back(std::make_unique<DecodedVector>());
names.emplace_back(inputType->nameOf(i));
types.emplace_back(inputType->childAt(i));
if (reusedHashTableAddress_ != nullptr) {
auto baseHashTable =
reinterpret_cast<exec::BaseHashTable*>(reusedHashTableAddress_);

BOLT_CHECK_NOT_NULL(joinBridge_);
joinBridge_->start();

if (baseHashTable->joinHasNullKeys() && isAntiJoin(joinType_) &&
nullAware_ && !joinNode_->filter()) {
joinBridge_->setAntiJoinHasNullKeys();
} else {
baseHashTable->prepareJoinTable({});

BOLT_CHECK_NOT_NULL(joinBridge_);
std::unique_ptr<
exec::BaseHashTable,
std::function<void(exec::BaseHashTable*)>>
hashTable(nullptr, [](exec::BaseHashTable* ptr) { /* Do nothing */ });
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a litter hacky


if (auto hasheTableWithNullKeys =
dynamic_cast<exec::HashTable<true>*>(baseHashTable)) {
hashTable.reset(hasheTableWithNullKeys);
} else if (
auto hasheTableWithoutNullKeys =
dynamic_cast<exec::HashTable<false>*>(baseHashTable)) {
hashTable.reset(hasheTableWithoutNullKeys);
} else {
BOLT_UNREACHABLE("Unexpected HashTable {}", baseHashTable->toString());
}

auto joinHashNullKeys = hashTable->joinHasNullKeys();
joinBridge_->setHashTable(std::move(hashTable), joinHashNullKeys);
}
} else {
auto inputType = joinNode_->sources()[1]->outputType();

const auto numKeys = joinNode_->rightKeys().size();
keyChannels_.reserve(numKeys);
std::vector<std::string> names;
names.reserve(inputType->size());
std::vector<TypePtr> types;
types.reserve(inputType->size());

for (int i = 0; i < numKeys; ++i) {
auto& key = joinNode_->rightKeys()[i];
auto channel = exprToChannel(key.get(), inputType);
keyChannelMap_[channel] = i;
keyChannels_.emplace_back(channel);
names.emplace_back(inputType->nameOf(channel));
types.emplace_back(inputType->childAt(channel));
}
}

tableType_ = ROW(std::move(names), std::move(types));
setupTable();
setupSpiller();
intermediateStateCleared_ = false;
maxHashTableBucketCount_ =
operatorCtx_->driverCtx()->queryConfig().maxHashTableSize();
BOLT_CHECK(maxHashTableBucketCount_ > 1);

dropDuplicates_ = canDropDuplicates(joinNode_);
addRuntimeStat(
"triggerDeduplicateInHashBuild",
RuntimeCounter(dropDuplicates_ ? 1 : 0));

// Identify the non-key build side columns and make a decoder for each.
const int32_t numDependents = inputType->size() - numKeys;
if (!dropDuplicates_ && numDependents > 0) {
// Number of join keys (numKeys) may be less then number of input columns
// (inputType->size()). In this case numDependents is negative and cannot
// be used to call 'reserve'. This happens when we join different probe
// side keys with the same build side key: SELECT * FROM t LEFT JOIN u ON
// t.k1 = u.k AND t.k2 = u.k.
dependentChannels_.reserve(numDependents);
decoders_.reserve(numDependents);
}
if (!dropDuplicates_) {
// For left semi and anti join with no extra filter, hash table does not
// store dependent columns.
for (auto i = 0; i < inputType->size(); ++i) {
if (keyChannelMap_.find(i) == keyChannelMap_.end()) {
dependentChannels_.emplace_back(i);
decoders_.emplace_back(std::make_unique<DecodedVector>());
names.emplace_back(inputType->nameOf(i));
types.emplace_back(inputType->childAt(i));
}
}
}
tableType_ = ROW(std::move(names), std::move(types));
setupTable();
setupSpiller();
intermediateStateCleared_ = false;
}

LOG(INFO) << name() << " HashBuild created for " << operatorCtx_->toString()
<< ", spill enabled: " << spillEnabled()
Expand Down Expand Up @@ -911,6 +947,10 @@ void HashBuild::addAndClearSpillTarget(uint64_t& numRows, uint64_t& numBytes) {
}

void HashBuild::noMoreInput() {
if (reusedHashTableAddress_ != nullptr) {
return;
}

checkRunning();

if (noMoreInput_) {
Expand Down Expand Up @@ -1371,6 +1411,9 @@ BlockingReason HashBuild::isBlocked(ContinueFuture* future) {
}

bool HashBuild::isFinished() {
if (reusedHashTableAddress_ != nullptr) {
return true;
}
return state_ == State::kFinish;
}

Expand Down
5 changes: 5 additions & 0 deletions bolt/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class HashBuild final : public Operator {
}

bool needsInput() const override {
if (reusedHashTableAddress_ != nullptr) {
return false;
}
return !noMoreInput_;
}

Expand Down Expand Up @@ -447,6 +450,8 @@ class HashBuild final : public Operator {
bool isDREnabled_{false};
int32_t maxHashTableBucketCount_{std::numeric_limits<int32_t>::max()};
std::shared_ptr<RowFormatInfo> rowFormatInfo_{nullptr};

void* reusedHashTableAddress_;
};

inline std::ostream& operator<<(std::ostream& os, HashBuild::State state) {
Expand Down
18 changes: 18 additions & 0 deletions bolt/exec/HashJoinBridge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,24 @@ bool HashJoinBridge::setHashTable(
return hasSpillData;
}

void HashJoinBridge::setHashTable(
std::shared_ptr<BaseHashTable> table,
bool hasNullKeys) {
BOLT_CHECK_NOT_NULL(table, "setHashTable called with null table");

std::vector<ContinuePromise> promises;
{
std::lock_guard<std::mutex> l(mutex_);
BOLT_CHECK(started_);
BOLT_CHECK(!buildResult_.has_value());
BOLT_CHECK(restoringSpillShards_.empty());

buildResult_ = HashBuildResult(std::move(table), hasNullKeys);
promises = std::move(promises_);
}
notify(std::move(promises));
}

void HashJoinBridge::setAntiJoinHasNullKeys() {
std::vector<ContinuePromise> promises;
SpillPartitionSet spillPartitions;
Expand Down
5 changes: 5 additions & 0 deletions bolt/exec/HashJoinBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class HashJoinBridge : public JoinBridge {
bool hasNullKeys,
SpillOffsetToBitsSet offsetToJoinBits = nullptr);

void setHashTable(std::shared_ptr<BaseHashTable> table, bool hasNullKeys);

void setAntiJoinHasNullKeys();

/// Represents the result of HashBuild operators: a hash table, an optional
Expand All @@ -83,6 +85,9 @@ class HashJoinBridge : public JoinBridge {

HashBuildResult() : hasNullKeys(true) {}

HashBuildResult(std::shared_ptr<BaseHashTable> _table, bool _hasNullKeys)
: hasNullKeys(_hasNullKeys), table(std::move(_table)) {}

bool hasNullKeys;
std::shared_ptr<BaseHashTable> table;
std::optional<SpillPartitionId> restoredPartitionId;
Expand Down
2 changes: 2 additions & 0 deletions bolt/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ void HashProbe::prepareForSpillRestore() {

// Reset the internal states which are relevant to the previous probe run.
noMoreSpillInput_ = false;
// bolt current doesn't have HashProbe Spill function
// https://github.com/facebookincubator/velox/commit/2ea66c6204d8f1b27f7682111b335bd4de8ef6fc#diff-9a2573a9b3648b92c3e9342bfcb69ab4748e85e1bb18eea7cd08513e9911b532
table_.reset();
spiller_.reset();
if (!reuseSpillReader_) {
Expand Down
Loading