Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions bolt/benchmarks/tpch/TpchBenchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,14 @@ class TpchBenchmark : public QueryBenchmarkBase {
}
}
runStats.rawInputBytes = rawInputBytes;
// Use endTimeMs (when all drivers finish) instead of executionEndTimeMs
// (when all splits finish) to include time spent in downstream operators
// like OrderBy's finish phase.
const auto endTime =
stats.endTimeMs > 0 ? stats.endTimeMs : stats.executionEndTimeMs;
out << fmt::format(
"Execution time: {}",
succinctMillis(
stats.executionEndTimeMs - stats.executionStartTimeMs))
succinctMillis(endTime - stats.executionStartTimeMs))
<< std::endl;
out << fmt::format(
"Splits total: {}, finished: {}",
Expand Down
25 changes: 25 additions & 0 deletions bolt/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,16 @@ class QueryConfig {
static constexpr const char* kParquetRepDefMemoryLimit =
"parquet_repdef_memory_limit";

static constexpr const char* kHybridJoinEnabled = "hybrid_join_enabled";

/// If true, reorder rows by containerId during hybrid join extraction for
/// better cache locality. Can be disabled for testing to get deterministic
/// output order.
static constexpr const char* kHybridJoinReorderEnabled =
"hybrid_join_reorder_enabled";

static constexpr const char* kHybridSortEnabled = "hybrid_sort_enabled";

/**
* LLVM JIT enabled
* -1 : enable all jit (by default)
Expand Down Expand Up @@ -988,6 +998,21 @@ class QueryConfig {
#endif
}

bool hybridJoinEnabled() const {
return get<bool>(kHybridJoinEnabled, false);
}

/// Returns whether to reorder rows by containerId during hybrid join
/// extraction. Default true for better cache locality. Can be disabled
/// for deterministic output order in tests.
bool hybridJoinReorderEnabled() const {
return get<bool>(kHybridJoinReorderEnabled, true);
}

bool hybridSortEnabled() const {
return get<bool>(kHybridSortEnabled, false);
}

/// Returns 'is aggregation spilling enabled' flag. Must also check the
/// spillEnabled()!
bool aggregationSpillEnabled() const {
Expand Down
126 changes: 106 additions & 20 deletions bolt/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,13 @@ HashBuild::HashBuild(

// Identify the non-key build side columns and make a decoder for each.
const int32_t numDependents = inputType->size() - numKeys;
std::vector<std::string> dependentNames;
std::vector<TypePtr> dependentTypes;

hybridJoin_ = operatorCtx_->driverCtx()->queryConfig().hybridJoinEnabled() &&
numDependents > 0 && !joinNode_->isLeftSemiFilterJoin() &&
!joinNode_->isLeftSemiProjectJoin() && !joinNode_->isAntiJoin();

if (!dropDuplicates_ && numDependents > 0) {
// Number of join keys (numKeys) may be less then number of input columns
// (inputType->size()). In this case numDependents is negative and cannot be
Expand All @@ -153,6 +160,8 @@ HashBuild::HashBuild(
// u.k AND t.k2 = u.k.
dependentChannels_.reserve(numDependents);
decoders_.reserve(numDependents);
dependentNames.reserve(numDependents);
dependentTypes.reserve(numDependents);
}
if (!dropDuplicates_) {
// For left semi and anti join with no extra filter, hash table does not
Expand All @@ -163,18 +172,30 @@ HashBuild::HashBuild(
decoders_.emplace_back(std::make_unique<DecodedVector>());
names.emplace_back(inputType->nameOf(i));
types.emplace_back(inputType->childAt(i));
dependentNames.emplace_back(inputType->nameOf(i));
dependentTypes.emplace_back(inputType->childAt(i));
}
}
}

tableType_ = ROW(std::move(names), std::move(types));
dependentTypes_ = ROW(std::move(dependentNames), std::move(dependentTypes));
driverId_ = driverCtx->driverId;
if (hybridJoin_) {
BOLT_CHECK_LE(
driverId_,
255,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why hardcode limit to 255?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently we store a BIGINT (64 bits) of rowId where the top 8 bits represents the driverId and the remaining 56 bits represents the rowId for each driver. So the max # of driver it supports is 255. Maybe we can make it as a config.

"driverId {} exceeds maximum 255 for hybrid join mode",
driverId_);
}
setupTable();
setupSpiller();
intermediateStateCleared_ = false;

LOG(INFO) << name() << " HashBuild created for " << operatorCtx_->toString()
<< ", spill enabled: " << spillEnabled()
<< ", maxHashTableSize = " << maxHashTableBucketCount_;
<< ", maxHashTableSize = " << maxHashTableBucketCount_
<< ", hybrid mode " << (hybridJoin_ ? "enabled" : "disbaled");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo disabled

}

void HashBuild::initialize() {
Expand Down Expand Up @@ -215,7 +236,8 @@ void HashBuild::setupTable() {
: BaseHashTable::HashMode::kArray,
queryConfig.minTableRowsForParallelJoinBuild(),
pool(),
queryConfig.enableJitRowEqVectors());
queryConfig.enableJitRowEqVectors(),
hybridJoin_);
} else {
// Right semi join needs to tag build rows that were probed.
const bool needProbedFlag = joinNode_->isRightSemiFilterJoin();
Expand All @@ -231,7 +253,8 @@ void HashBuild::setupTable() {
: BaseHashTable::HashMode::kArray,
queryConfig.minTableRowsForParallelJoinBuild(),
pool(),
queryConfig.enableJitRowEqVectors());
queryConfig.enableJitRowEqVectors(),
hybridJoin_);
} else {
// Ignore null keys
table_ = HashTable<true>::createForJoin(
Expand All @@ -243,13 +266,27 @@ void HashBuild::setupTable() {
: BaseHashTable::HashMode::kArray,
queryConfig.minTableRowsForParallelJoinBuild(),
pool(),
queryConfig.enableJitRowEqVectors());
queryConfig.enableJitRowEqVectors(),
hybridJoin_);
}
}
lookup_ = std::make_unique<HashLookup>(
table_->hashers(), queryConfig.enableJitRowEqVectors());
lookup_->reset(1);
analyzeKeys_ = table_->hashMode() != BaseHashTable::HashMode::kHash;

if (hybridJoin_) {
table_->hybridData()->setId(static_cast<uint8_t>(driverId_));
// Initialize allContainers_ with itself so spilling can work before table
// merge.
std::unordered_map<uint8_t, HybridContainer*> selfContainer;
selfContainer[static_cast<uint8_t>(driverId_)] = table_->hybridData();
table_->hybridData()->setAllContainers(selfContainer);
// Set reorder flag from query config - can be disabled for deterministic
// testing.
table_->hybridData()->setReorderEnabled(
queryConfig.hybridJoinReorderEnabled());
}
}

void HashBuild::setupSpiller(
Expand All @@ -264,7 +301,8 @@ void HashBuild::setupSpiller(

const auto& spillConfig = spillConfig_.value();
bool canUseRowBasedSpill = joinBridge_->numBuilders() == 1 &&
operatorCtx_->task()->numDrivers(operatorCtx_->driverCtx()) == 1; // TODO
operatorCtx_->task()->numDrivers(operatorCtx_->driverCtx()) == 1 &&
!hybridJoin_; // Disable row-based spill for hybrid join mode
if (canUseRowBasedSpill) {
*const_cast<common::RowBasedSpillMode*>(&spillConfig.rowBasedSpillMode) =
common::strToRowBasedSpillMode(
Expand Down Expand Up @@ -349,6 +387,12 @@ void HashBuild::setupSpiller(
spiller_->setSkewThreshold(
skewFileSizeRatioThreshold_, skewRowCountRatioThreshold_);

// Enable hybrid mode for spiller if hybrid join is enabled.
// Works for both initial spill and repartition spill during restoration.
if (hybridJoin_ && table_->hybridData()) {
spiller_->setHybridMode(true, table_->hybridData());
}

const int32_t numPartitions = spiller_->hashBits().numPartitions();
spillLevel_ = spillConfig.joinSpillLevel(offsetTojoinBits_);
spillInputIndicesBuffers_.resize(numPartitions);
Expand Down Expand Up @@ -572,21 +616,46 @@ void HashBuild::addInput(RowVectorPtr input) {
}
auto rows = table_->rows();
auto nextOffset = rows->nextOffset();
activeRows_.applyToSelected([&](auto rowIndex) {
char* newRow = rows->newRow();
if (nextOffset) {
*reinterpret_cast<char**>(newRow + nextOffset) = nullptr;
}
// Store the columns for each row in sequence. At probe time
// strings of the row will probably be in consecutive places, so
// reading one will prime the cache for the next.
for (auto i = 0; i < hashers.size(); ++i) {
rows->store(hashers[i]->decodedVector(), rowIndex, newRow, i);
}
for (auto i = 0; i < dependentChannels_.size(); ++i) {
rows->store(*decoders_[i], rowIndex, newRow, i + hashers.size());
}
});

if (hybridJoin_) {
activeRows_.applyToSelected([&](auto rowIndex) {
char* newRow = rows->newRow();
if (nextOffset) {
*reinterpret_cast<char**>(newRow + nextOffset) = nullptr;
}
// Store the columns for each row in sequence. At probe time
// strings of the row will probably be in consecutive places, so
// reading one will prime the cache for the next.
for (auto i = 0; i < hashers.size(); ++i) {
rows->store(hashers[i]->decodedVector(), rowIndex, newRow, i);
}
// Store RowId
auto baseRow = table_->hybridData()->getNumRows();
uint64_t encodedId = (static_cast<uint64_t>(driverId_)
<< 56) | // top 8 bits: driverId [0, 255]
(static_cast<uint64_t>(rowIndex + baseRow) & ((1ULL << 56) - 1));
rows->storeSingleRowId(encodedId, newRow);
});
auto payloadInput = wrapColumns(
input->as<RowVector>(), dependentChannels_, dependentTypes_, pool());
table_->hybridData()->addPayload(std::move(payloadInput));
} else {
activeRows_.applyToSelected([&](auto rowIndex) {
char* newRow = rows->newRow();
if (nextOffset) {
*reinterpret_cast<char**>(newRow + nextOffset) = nullptr;
}
// Store the columns for each row in sequence. At probe time
// strings of the row will probably be in consecutive places, so
// reading one will prime the cache for the next.
for (auto i = 0; i < hashers.size(); ++i) {
rows->store(hashers[i]->decodedVector(), rowIndex, newRow, i);
}
for (auto i = 0; i < dependentChannels_.size(); ++i) {
rows->store(*decoders_[i], rowIndex, newRow, i + hashers.size());
}
});
}
spillRowBasedInput();
}

Expand Down Expand Up @@ -897,6 +966,11 @@ void HashBuild::runSpill(const std::vector<Operator*>& spillOperators) {
// run in parallel.
for (auto& spillOp : spillOperators) {
HashBuild* build = dynamic_cast<HashBuild*>(spillOp);
// Coalesce batches before spilling to ensure hybrid data is properly laid
// out.
if (build->hybridJoin_ && build->table_->hybridData()) {
build->table_->hybridData()->coalesceBatches();
}
build->spiller_->spill();
build->table_->clear();
build->pool()->release();
Expand All @@ -922,6 +996,13 @@ void HashBuild::noMoreInput() {
}

void HashBuild::noMoreInputInternal() {
// Coalesce batches in this driver's HybridContainer before merging with
// peers. This handles both the normal path (from noMoreInput) and spill
// restore path (from processSpillInput). Each driver does this independently.
if (hybridJoin_ && table_->hybridData()) {
table_->hybridData()->coalesceBatches();
}

if (spillEnabled()) {
spillGroup_->operatorStopped(*this);
}
Expand Down Expand Up @@ -1516,6 +1597,11 @@ void HashBuild::reclaim(
spillTasks.push_back(
std::make_shared<AsyncSource<SpillResult>>([buildOp]() {
try {
// Coalesce batches before spilling to ensure hybrid data is
// properly laid out.
if (buildOp->hybridJoin_ && buildOp->table_->hybridData()) {
buildOp->table_->hybridData()->coalesceBatches();
}
buildOp->spiller_->spill();
buildOp->table_->clear();
// Release the minimum reserved memory.
Expand Down
6 changes: 6 additions & 0 deletions bolt/exec/HashBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ class HashBuild final : public Operator {

// Non-key channels in 'input_'.
std::vector<column_index_t> dependentChannels_;
RowTypePtr dependentTypes_;

// Corresponds 1:1 to 'dependentChannels_'.
std::vector<std::unique_ptr<DecodedVector>> decoders_;
Expand Down Expand Up @@ -447,6 +448,11 @@ class HashBuild final : public Operator {
bool isDREnabled_{false};
int32_t maxHashTableBucketCount_{std::numeric_limits<int32_t>::max()};
std::shared_ptr<RowFormatInfo> rowFormatInfo_{nullptr};

// For hybrid join
bool hybridJoin_{false};
int driverId_;
std::unique_ptr<HybridContainer> hybridData_;
};

inline std::ostream& operator<<(std::ostream& os, HashBuild::State state) {
Expand Down
Loading
Loading