Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ Please verify that your change does not introduce performance regressions.
-->
- [ ] **No Impact**: This change does not affect the critical path (e.g., build system, doc, error handling).
- [ ] **Positive Impact**: I have run benchmarks.
<details>
<summary>Click to view Benchmark Results</summary>
<details>
<summary>Click to view Benchmark Results</summary>

```text
Paste your google-benchmark or TPC-H results here.
Before: 10.5s
After: 8.2s (+20%)
```

</details>

```text
Paste your google-benchmark or TPC-H results here.
Before: 10.5s
After: 8.2s (+20%)
```
</details>
- [ ] **Negative Impact**: Explained below (e.g., trade-off for correctness).

### Release Note
Expand Down Expand Up @@ -70,13 +72,12 @@ If yes, please describe how users should migrate.

- [ ] No
- [ ] Yes (Description: ...)
<details>
<summary>Click to view Breaking Changes</summary>

```text
Breaking Changes:
- Description of the breaking change.
- Possible solutions or workarounds.
- Any other relevant information.
```
</details>
<details>
<summary>Click to view Breaking Changes</summary>
```text
Breaking Changes:
- Description of the breaking change.
- Possible solutions or workarounds.
- Any other relevant information.
```
</details>
11 changes: 11 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,17 @@ unittest_coverage: debug_with_test_cov #: Build with debugging and run unit tes
lcov --remove coverage.info '/usr/*' '*/.conan/data/*' '*/_build/*' '*/tests/*' '*/test/*' --output-file coverage_striped.info && \
genhtml --ignore-errors source coverage_striped.info --output-directory coverage

unittest_single:
ifndef TARGET
$(error TARGET is undefined. Usage: make unittest_single TARGET=TargetName [TEST=TestFilter])
endif
cmake --build $(BUILD_BASE_DIR)/Release --target $(TARGET) -j $(NUM_THREADS)
ifneq ($(TEST),)
export GTEST_FILTER="$(TEST)" && ctest --test-dir $(BUILD_BASE_DIR)/Release -R "$(TARGET)" --output-on-failure --timeout 7200
else
ctest --test-dir $(BUILD_BASE_DIR)/Release -R "$(TARGET)" --output-on-failure --timeout 7200
endif

hdfstest: hdfs-debug-build #: Build with debugging, hdfs enabled and run hdfs tests
ctest --test-dir $(BUILD_BASE_DIR)/Debug -j ${NUM_THREADS} --output-on-failure -R bolt_hdfs_file_test

Expand Down
2 changes: 1 addition & 1 deletion bolt/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ class AsyncThreadCtx {
void disallowPreload() {
if (adaptive_ && allowPreload_.load()) {
allowPreload_ = false;
LOG(WARNING) << "Disallow scan preload due to limited memory";
LOG(INFO) << "Disallow scan preload due to limited memory";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

disallowing preload is unexpected, so I preperf waning msg.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log appears excessively when running queries like TPC-DS in CLI. Using WARNING causes significant noise in the CLI (especially in spark-sql/shell).
I switched to INFO to prevent it from polluting the console output, while ensuring it is still recorded in the log files.

}
}

Expand Down
35 changes: 35 additions & 0 deletions bolt/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,21 @@ class QueryConfig {
static constexpr const char* kSkewRowCountRatioThreshold =
"hash_join_skewed_row_count_ratio";

static constexpr const char* kEnableHashJoinArrayRecluster =
"enable_hash_join_array_recluster";

static constexpr const char* kHashJoinArrayReclusterMode =
"hash_join_array_recluster_mode";

static constexpr const char* kHashJoinArrayReclusterDuplicateRatioThreshold =
"hash_join_array_recluster_duplicate_ratio_threshold";

static constexpr const char* kHashJoinArrayReclusterMinProbeRowNumber =
"hash_join_array_recluster_min_probe_row_number";

static constexpr const char* kHashJoinArrayReclusterMinDistinctRowNumber =
"hash_join_array_recluster_min_distinct_row_number";

// -1 means print all exceptions, usualing for debug
// 0 means disable all exceptions,
// 1 means print exceptions whose prefix is in the white list(default)
Expand Down Expand Up @@ -1490,6 +1505,26 @@ class QueryConfig {
return get<bool>(kHashJoinSkewedPartitionEnabled, true);
}

bool hashJoinArrayReclusterEnabled() const {
return get<bool>(kEnableHashJoinArrayRecluster, true);
}

std::string hashJoinArrayReclusterMode() const {
return get<std::string>(kHashJoinArrayReclusterMode, "hash");
}

int64_t hashJoinArrayReclusterDuplicateRatioThreshold() const {
return get<int64_t>(kHashJoinArrayReclusterDuplicateRatioThreshold, 128);
}

int64_t hashJoinArrayReclusterMinProbeRowNumber() const {
return get<int64_t>(kHashJoinArrayReclusterMinProbeRowNumber, 500000);
}

int64_t hashJoinArrayReclusterMinDistinctRowNumber() const {
return get<int64_t>(kHashJoinArrayReclusterMinDistinctRowNumber, 32);
}

int32_t skewFileSizeRatioThreshold() const {
return get<int32_t>(kSkewFileSizeRatioThreshold, 10);
}
Expand Down
16 changes: 13 additions & 3 deletions bolt/exec/HashBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ void HashBuild::initialize() {
if (isAntiJoin(joinType_) && joinNode_->filter()) {
setupFilterForAntiJoins(keyChannelMap_);
}
table_->setNumEstimatedProbeRows(joinBridge_->numEstimatedProbeRows());
}

void HashBuild::setupTable() {
Expand All @@ -203,6 +204,12 @@ void HashBuild::setupTable() {
dependentTypes.emplace_back(tableType_->childAt(i));
}
auto& queryConfig = operatorCtx_->driverCtx()->queryConfig();
HashTableReclusterConfig hashTableReclusterConfig(
queryConfig.hashJoinArrayReclusterDuplicateRatioThreshold(),
queryConfig.hashJoinArrayReclusterMinProbeRowNumber(),
queryConfig.hashJoinArrayReclusterMinDistinctRowNumber(),
queryConfig.hashJoinArrayReclusterMode(),
queryConfig.hashJoinArrayReclusterEnabled());
if (joinNode_->isRightJoin() || joinNode_->isFullJoin() ||
joinNode_->isRightSemiProjectJoin()) {
// Do not ignore null keys.
Expand All @@ -215,7 +222,8 @@ void HashBuild::setupTable() {
: BaseHashTable::HashMode::kArray,
queryConfig.minTableRowsForParallelJoinBuild(),
pool(),
queryConfig.enableJitRowEqVectors());
queryConfig.enableJitRowEqVectors(),
hashTableReclusterConfig);
} else {
// Right semi join needs to tag build rows that were probed.
const bool needProbedFlag = joinNode_->isRightSemiFilterJoin();
Expand All @@ -231,7 +239,8 @@ void HashBuild::setupTable() {
: BaseHashTable::HashMode::kArray,
queryConfig.minTableRowsForParallelJoinBuild(),
pool(),
queryConfig.enableJitRowEqVectors());
queryConfig.enableJitRowEqVectors(),
hashTableReclusterConfig);
} else {
// Ignore null keys
table_ = HashTable<true>::createForJoin(
Expand All @@ -243,7 +252,8 @@ void HashBuild::setupTable() {
: BaseHashTable::HashMode::kArray,
queryConfig.minTableRowsForParallelJoinBuild(),
pool(),
queryConfig.enableJitRowEqVectors());
queryConfig.enableJitRowEqVectors(),
hashTableReclusterConfig);
}
}
lookup_ = std::make_unique<HashLookup>(
Expand Down
10 changes: 10 additions & 0 deletions bolt/exec/HashJoinBridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ class HashJoinBridge : public JoinBridge {
}
}

uint64_t numEstimatedProbeRows() const {
return numEstimatedProbeRows_;
}

void setNumEstimatedProbeRows(uint64_t numEstimatedProbeRows) {
numEstimatedProbeRows_ = numEstimatedProbeRows;
}

private:
uint32_t numBuilders_{0};

Expand All @@ -158,6 +166,8 @@ class HashJoinBridge : public JoinBridge {
// This set can grow if HashBuild operator cannot load full partition in
// memory and engages in recursive spilling.
SpillPartitionSet spillPartitionSets_;

uint64_t numEstimatedProbeRows_{0};
};

// Indicates if 'joinNode' is null-aware anti or left semi project join type and
Expand Down
3 changes: 3 additions & 0 deletions bolt/exec/HashProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ HashProbe::HashProbe(

void HashProbe::initialize() {
Operator::initialize();
uint64_t totalRowCnt{0};
operatorCtx_->traverseOpToGetRowCount(totalRowCnt);
joinBridge_->setNumEstimatedProbeRows(totalRowCnt);
auto jitRowEqVectors =
operatorCtx_->driverCtx()->queryConfig().enableJitRowEqVectors();
BOLT_CHECK(hashers_.empty());
Expand Down
140 changes: 139 additions & 1 deletion bolt/exec/HashTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,12 @@ void HashTable<ignoreNullKeys>::allocateTables(uint64_t size) {
BOLT_CHECK_GT(size, 0);
capacity_ = size;
const uint64_t byteSize = capacity_ * tableSlotSize();
BOLT_CHECK_EQ(byteSize % kBucketSize, 0);
BOLT_CHECK_EQ(
byteSize % kBucketSize,
0,
"byteSize: {}, kBucketSize: {}, ",
byteSize,
kBucketSize);
numTombstones_ = 0;
sizeMask_ = byteSize - 1;
numBuckets_ = byteSize / kBucketSize;
Expand Down Expand Up @@ -1550,6 +1555,35 @@ void HashTable<ignoreNullKeys>::clearUseRange(std::vector<bool>& useRange) {
}
}

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::tryRecluster() {
if (!reclusterConfig_.enableArrayRecluster) {
return;
}

// note that maxDistinctNumber is the maximum number of distinct values
// in all hashers. But numDistinct_ is not DISTINCT number of values in
// the row container.

if (numDistinct_ >= numEstimatedProbeRows_ ||
numEstimatedProbeRows_ < reclusterConfig_.minProbeRowNumber) {
return;
}
size_t maxDistinctNumber = 0;

for (auto& hasher : hashers_) {
maxDistinctNumber = std::max(maxDistinctNumber, hasher->numUniqueValues());
}
int64_t duplicateRatio =
maxDistinctNumber > 0 ? numDistinct_ / maxDistinctNumber : 0;
if (duplicateRatio < reclusterConfig_.duplicateRatioThreshold ||
maxDistinctNumber < reclusterConfig_.minDistinctRowNumber) {
return;
}

reclusterDataByKey();
}

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::decideHashMode(
int32_t numNew,
Expand Down Expand Up @@ -1775,6 +1809,107 @@ bool mayUseValueIds(const BaseHashTable& table) {
}
} // namespace

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::reclusterDataByKey() {
if (!isJoinBuild_) {
LOG(INFO) << "reclusterDataByKey: joinBuild_ is false";
return;
}
if (rows_->numRows() == 0) {
LOG(INFO) << "reclusterDataByKey: numRows is 0";
return;
}
if (rows_->keyTypes().empty()) {
LOG(INFO) << "reclusterDataByKey: keyTypes is empty";
return;
}
if (rows_->accumulators().size() > 0) {
LOG(INFO) << "reclusterDataByKey: accumulators is not empty";
return;
}
if (sorted_) {
LOG(INFO) << "reclusterDataByKey: sorted_ is true";
return;
}

if (hashMode_ != HashMode::kArray) {
LOG(INFO) << "reclusterDataByKey: hashMode_ is not kArray";
return;
}

auto numRows = rows_->numRows();
std::vector<char*> sortedRows(numRows);

RowContainerIterator iter;
rows_->listRows(&iter, numRows, sortedRows.data());

if (reclusterConfig_.reclusterMode ==
HashTableReclusterConfig::ReclusterMode::kSort) {
HybridSorter sorter{SortAlgo::kAuto};
sorter.template sort(
sortedRows.begin(),
sortedRows.end(),
[this](const char* leftRow, const char* rightRow) {
return rows_->compareRows(leftRow, rightRow) < 0;
});
} else if (
reclusterConfig_.reclusterMode ==
HashTableReclusterConfig::ReclusterMode::kHash) {
Comment on lines +1855 to +1857
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try to reduce memory cost?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the goal isn't to save memory. Both methods are designed to improve cache locality during the probe phase by physically reordering the RowContainer.

The sort-based method guarantees that identical keys are stored continuously, but it introduces significant overhead about 7x slower than hash.
The hash-based method is a trade-off: while it doesn't guarantee strict continuity like sorting, it effectively clusters similar keys with much lower computational overhead. It provides a better balance between build time and probe performance.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean it needs to reduce memory cost here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inplace recluster is much slower than copy the clustered data and then replace rowcontainer

std::vector<uint64_t> rowHashes(numRows);
folly::Range<char**> rowRange(sortedRows.data(), numRows);

for (size_t i = 0; i < rows_->keyTypes().size(); ++i) {
bool mix = (i > 0);
rows_->hash(i, rowRange, mix, rowHashes.data());
}

folly::F14FastMap<uint64_t, size_t> counts;
counts.reserve(4096);
for (uint64_t h : rowHashes) {
counts[h]++;
}

folly::F14FastMap<uint64_t, size_t> offsets;
offsets.reserve(counts.size());

size_t currentOffset = 0;
for (auto& kv : counts) {
offsets[kv.first] = currentOffset;
currentOffset += kv.second;
}
std::vector<char*> result(numRows);

for (int32_t i = 0; i < numRows; ++i) {
uint64_t h = rowHashes[i];

size_t pos = offsets[h]++;

result[pos] = sortedRows[i];
}
sortedRows = std::move(result);
} else {
LOG(ERROR) << "reclusterDataByKey: unknown reclusterMode: "
<< static_cast<int>(reclusterConfig_.reclusterMode);
return;
}

rows_ = std::move(rows_->cloneByOrder(sortedRows));
if (table_ != nullptr) {
rows_->pool()->freeContiguous(tableAllocation_);
table_ = nullptr;
}
numTombstones_ = 0;

for (size_t i = 0; i < otherTables_.size(); ++i) {
otherTables_[i]->reclusterDataByKey();
}
capacity_ = bits::nextPowerOfTwo(
std::max(static_cast<uint64_t>(numRows), kBucketSize));
allocateTables(capacity_);
rehash(true);
sorted_ = true;
}

template <bool ignoreNullKeys>
void HashTable<ignoreNullKeys>::prepareJoinTable(
std::vector<std::unique_ptr<BaseHashTable>> tables,
Expand Down Expand Up @@ -1852,6 +1987,9 @@ void HashTable<ignoreNullKeys>::prepareJoinTable(
}
} else {
decideHashMode(0);
if (hashMode_ == HashMode::kArray) {
tryRecluster();
}
}
checkHashBitsOverlap(spillInputStartPartitionBit);
LOG(INFO) << __FUNCTION__ << ": capacity_ = " << capacity_
Expand Down
Loading
Loading