From bc48b0f8992f601c20aa8b2591f24431bb391420 Mon Sep 17 00:00:00 2001 From: yangzhengguo Date: Mon, 22 Dec 2025 19:01:05 +0800 Subject: [PATCH] [Optimization] Recluster RowContainer in HashJoin Array Mode to improve cache locality Introduces an optimization for Hash Join in Array mode. When the build side has low cardinality but a high number of rows (high duplication factor), the linked lists in the hash table often point to scattered memory locations in the RowContainer. This causes severe cache thrashing during the probe phase. --- .github/pull_request_template.md | 37 +++---- Makefile | 11 +++ bolt/connectors/Connector.h | 2 +- bolt/core/QueryConfig.h | 35 +++++++ bolt/exec/HashBuild.cpp | 16 ++- bolt/exec/HashJoinBridge.h | 10 ++ bolt/exec/HashProbe.cpp | 3 + bolt/exec/HashTable.cpp | 140 ++++++++++++++++++++++++++- bolt/exec/HashTable.h | 80 ++++++++++++++- bolt/exec/Operator.cpp | 35 +++++++ bolt/exec/Operator.h | 2 + bolt/exec/RowContainer.cpp | 104 ++++++++++++++++++++ bolt/exec/RowContainer.h | 5 + bolt/exec/tests/HashTableTest.cpp | 69 +++++++++++++ bolt/exec/tests/RowContainerTest.cpp | 62 ++++++++++++ 15 files changed, 585 insertions(+), 26 deletions(-) diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index f1b6aeaf..d44968eb 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -27,15 +27,17 @@ Please verify that your change does not introduce performance regressions. --> - [ ] **No Impact**: This change does not affect the critical path (e.g., build system, doc, error handling). - [ ] **Positive Impact**: I have run benchmarks. -
- Click to view Benchmark Results +
+Click to view Benchmark Results + +```text +Paste your google-benchmark or TPC-H results here. +Before: 10.5s +After: 8.2s (+20%) +``` + +
- ```text - Paste your google-benchmark or TPC-H results here. - Before: 10.5s - After: 8.2s (+20%) - ``` -
- [ ] **Negative Impact**: Explained below (e.g., trade-off for correctness). ### Release Note @@ -70,13 +72,12 @@ If yes, please describe how users should migrate. - [ ] No - [ ] Yes (Description: ...) -
- Click to view Breaking Changes - - ```text - Breaking Changes: - - Description of the breaking change. - - Possible solutions or workarounds. - - Any other relevant information. - ``` -
+
+Click to view Breaking Changes +```text +Breaking Changes: +- Description of the breaking change. +- Possible solutions or workarounds. +- Any other relevant information. +``` +
diff --git a/Makefile b/Makefile index 0f8592a0..cd8a4f69 100644 --- a/Makefile +++ b/Makefile @@ -348,6 +348,17 @@ unittest_coverage: debug_with_test_cov #: Build with debugging and run unit tes lcov --remove coverage.info '/usr/*' '*/.conan/data/*' '*/_build/*' '*/tests/*' '*/test/*' --output-file coverage_striped.info && \ genhtml --ignore-errors source coverage_striped.info --output-directory coverage +unittest_single: +ifndef TARGET + $(error TARGET is undefined. Usage: make unittest_single TARGET=TargetName [TEST=TestFilter]) +endif + cmake --build $(BUILD_BASE_DIR)/Release --target $(TARGET) -j $(NUM_THREADS) +ifneq ($(TEST),) + export GTEST_FILTER="$(TEST)" && ctest --test-dir $(BUILD_BASE_DIR)/Release -R "$(TARGET)" --output-on-failure --timeout 7200 +else + ctest --test-dir $(BUILD_BASE_DIR)/Release -R "$(TARGET)" --output-on-failure --timeout 7200 +endif + hdfstest: hdfs-debug-build #: Build with debugging, hdfs enabled and run hdfs tests ctest --test-dir $(BUILD_BASE_DIR)/Debug -j ${NUM_THREADS} --output-on-failure -R bolt_hdfs_file_test diff --git a/bolt/connectors/Connector.h b/bolt/connectors/Connector.h index 510628df..b1f91e98 100644 --- a/bolt/connectors/Connector.h +++ b/bolt/connectors/Connector.h @@ -412,7 +412,7 @@ class AsyncThreadCtx { void disallowPreload() { if (adaptive_ && allowPreload_.load()) { allowPreload_ = false; - LOG(WARNING) << "Disallow scan preload due to limited memory"; + LOG(INFO) << "Disallow scan preload due to limited memory"; } } diff --git a/bolt/core/QueryConfig.h b/bolt/core/QueryConfig.h index af9be731..39c4ef11 100644 --- a/bolt/core/QueryConfig.h +++ b/bolt/core/QueryConfig.h @@ -652,6 +652,21 @@ class QueryConfig { static constexpr const char* kSkewRowCountRatioThreshold = "hash_join_skewed_row_count_ratio"; + static constexpr const char* kEnableHashJoinArrayRecluster = + "enable_hash_join_array_recluster"; + + static constexpr const char* kHashJoinArrayReclusterMode = + "hash_join_array_recluster_mode"; + + static constexpr const char* kHashJoinArrayReclusterDuplicateRatioThreshold = + "hash_join_array_recluster_duplicate_ratio_threshold"; + + static constexpr const char* kHashJoinArrayReclusterMinProbeRowNumber = + "hash_join_array_recluster_min_probe_row_number"; + + static constexpr const char* kHashJoinArrayReclusterMinDistinctRowNumber = + "hash_join_array_recluster_min_distinct_row_number"; + // -1 means print all exceptions, usualing for debug // 0 means disable all exceptions, // 1 means print exceptions whose prefix is in the white list(default) @@ -1490,6 +1505,26 @@ class QueryConfig { return get(kHashJoinSkewedPartitionEnabled, true); } + bool hashJoinArrayReclusterEnabled() const { + return get(kEnableHashJoinArrayRecluster, true); + } + + std::string hashJoinArrayReclusterMode() const { + return get(kHashJoinArrayReclusterMode, "hash"); + } + + int64_t hashJoinArrayReclusterDuplicateRatioThreshold() const { + return get(kHashJoinArrayReclusterDuplicateRatioThreshold, 128); + } + + int64_t hashJoinArrayReclusterMinProbeRowNumber() const { + return get(kHashJoinArrayReclusterMinProbeRowNumber, 500000); + } + + int64_t hashJoinArrayReclusterMinDistinctRowNumber() const { + return get(kHashJoinArrayReclusterMinDistinctRowNumber, 32); + } + int32_t skewFileSizeRatioThreshold() const { return get(kSkewFileSizeRatioThreshold, 10); } diff --git a/bolt/exec/HashBuild.cpp b/bolt/exec/HashBuild.cpp index 2ad5df41..b34ff24e 100644 --- a/bolt/exec/HashBuild.cpp +++ b/bolt/exec/HashBuild.cpp @@ -183,6 +183,7 @@ void HashBuild::initialize() { if (isAntiJoin(joinType_) && joinNode_->filter()) { setupFilterForAntiJoins(keyChannelMap_); } + table_->setNumEstimatedProbeRows(joinBridge_->numEstimatedProbeRows()); } void HashBuild::setupTable() { @@ -203,6 +204,12 @@ void HashBuild::setupTable() { dependentTypes.emplace_back(tableType_->childAt(i)); } auto& queryConfig = operatorCtx_->driverCtx()->queryConfig(); + HashTableReclusterConfig hashTableReclusterConfig( + queryConfig.hashJoinArrayReclusterDuplicateRatioThreshold(), + queryConfig.hashJoinArrayReclusterMinProbeRowNumber(), + queryConfig.hashJoinArrayReclusterMinDistinctRowNumber(), + queryConfig.hashJoinArrayReclusterMode(), + queryConfig.hashJoinArrayReclusterEnabled()); if (joinNode_->isRightJoin() || joinNode_->isFullJoin() || joinNode_->isRightSemiProjectJoin()) { // Do not ignore null keys. @@ -215,7 +222,8 @@ void HashBuild::setupTable() { : BaseHashTable::HashMode::kArray, queryConfig.minTableRowsForParallelJoinBuild(), pool(), - queryConfig.enableJitRowEqVectors()); + queryConfig.enableJitRowEqVectors(), + hashTableReclusterConfig); } else { // Right semi join needs to tag build rows that were probed. const bool needProbedFlag = joinNode_->isRightSemiFilterJoin(); @@ -231,7 +239,8 @@ void HashBuild::setupTable() { : BaseHashTable::HashMode::kArray, queryConfig.minTableRowsForParallelJoinBuild(), pool(), - queryConfig.enableJitRowEqVectors()); + queryConfig.enableJitRowEqVectors(), + hashTableReclusterConfig); } else { // Ignore null keys table_ = HashTable::createForJoin( @@ -243,7 +252,8 @@ void HashBuild::setupTable() { : BaseHashTable::HashMode::kArray, queryConfig.minTableRowsForParallelJoinBuild(), pool(), - queryConfig.enableJitRowEqVectors()); + queryConfig.enableJitRowEqVectors(), + hashTableReclusterConfig); } } lookup_ = std::make_unique( diff --git a/bolt/exec/HashJoinBridge.h b/bolt/exec/HashJoinBridge.h index cf2f8b71..c057fd6f 100644 --- a/bolt/exec/HashJoinBridge.h +++ b/bolt/exec/HashJoinBridge.h @@ -134,6 +134,14 @@ class HashJoinBridge : public JoinBridge { } } + uint64_t numEstimatedProbeRows() const { + return numEstimatedProbeRows_; + } + + void setNumEstimatedProbeRows(uint64_t numEstimatedProbeRows) { + numEstimatedProbeRows_ = numEstimatedProbeRows; + } + private: uint32_t numBuilders_{0}; @@ -158,6 +166,8 @@ class HashJoinBridge : public JoinBridge { // This set can grow if HashBuild operator cannot load full partition in // memory and engages in recursive spilling. SpillPartitionSet spillPartitionSets_; + + uint64_t numEstimatedProbeRows_{0}; }; // Indicates if 'joinNode' is null-aware anti or left semi project join type and diff --git a/bolt/exec/HashProbe.cpp b/bolt/exec/HashProbe.cpp index 197afb87..ff0a798c 100644 --- a/bolt/exec/HashProbe.cpp +++ b/bolt/exec/HashProbe.cpp @@ -158,6 +158,9 @@ HashProbe::HashProbe( void HashProbe::initialize() { Operator::initialize(); + uint64_t totalRowCnt{0}; + operatorCtx_->traverseOpToGetRowCount(totalRowCnt); + joinBridge_->setNumEstimatedProbeRows(totalRowCnt); auto jitRowEqVectors = operatorCtx_->driverCtx()->queryConfig().enableJitRowEqVectors(); BOLT_CHECK(hashers_.empty()); diff --git a/bolt/exec/HashTable.cpp b/bolt/exec/HashTable.cpp index 10a17207..7849f57b 100644 --- a/bolt/exec/HashTable.cpp +++ b/bolt/exec/HashTable.cpp @@ -820,7 +820,12 @@ void HashTable::allocateTables(uint64_t size) { BOLT_CHECK_GT(size, 0); capacity_ = size; const uint64_t byteSize = capacity_ * tableSlotSize(); - BOLT_CHECK_EQ(byteSize % kBucketSize, 0); + BOLT_CHECK_EQ( + byteSize % kBucketSize, + 0, + "byteSize: {}, kBucketSize: {}, ", + byteSize, + kBucketSize); numTombstones_ = 0; sizeMask_ = byteSize - 1; numBuckets_ = byteSize / kBucketSize; @@ -1550,6 +1555,35 @@ void HashTable::clearUseRange(std::vector& useRange) { } } +template +void HashTable::tryRecluster() { + if (!reclusterConfig_.enableArrayRecluster) { + return; + } + + // note that maxDistinctNumber is the maximum number of distinct values + // in all hashers. But numDistinct_ is not DISTINCT number of values in + // the row container. + + if (numDistinct_ >= numEstimatedProbeRows_ || + numEstimatedProbeRows_ < reclusterConfig_.minProbeRowNumber) { + return; + } + size_t maxDistinctNumber = 0; + + for (auto& hasher : hashers_) { + maxDistinctNumber = std::max(maxDistinctNumber, hasher->numUniqueValues()); + } + int64_t duplicateRatio = + maxDistinctNumber > 0 ? numDistinct_ / maxDistinctNumber : 0; + if (duplicateRatio < reclusterConfig_.duplicateRatioThreshold || + maxDistinctNumber < reclusterConfig_.minDistinctRowNumber) { + return; + } + + reclusterDataByKey(); +} + template void HashTable::decideHashMode( int32_t numNew, @@ -1775,6 +1809,107 @@ bool mayUseValueIds(const BaseHashTable& table) { } } // namespace +template +void HashTable::reclusterDataByKey() { + if (!isJoinBuild_) { + LOG(INFO) << "reclusterDataByKey: joinBuild_ is false"; + return; + } + if (rows_->numRows() == 0) { + LOG(INFO) << "reclusterDataByKey: numRows is 0"; + return; + } + if (rows_->keyTypes().empty()) { + LOG(INFO) << "reclusterDataByKey: keyTypes is empty"; + return; + } + if (rows_->accumulators().size() > 0) { + LOG(INFO) << "reclusterDataByKey: accumulators is not empty"; + return; + } + if (sorted_) { + LOG(INFO) << "reclusterDataByKey: sorted_ is true"; + return; + } + + if (hashMode_ != HashMode::kArray) { + LOG(INFO) << "reclusterDataByKey: hashMode_ is not kArray"; + return; + } + + auto numRows = rows_->numRows(); + std::vector sortedRows(numRows); + + RowContainerIterator iter; + rows_->listRows(&iter, numRows, sortedRows.data()); + + if (reclusterConfig_.reclusterMode == + HashTableReclusterConfig::ReclusterMode::kSort) { + HybridSorter sorter{SortAlgo::kAuto}; + sorter.template sort( + sortedRows.begin(), + sortedRows.end(), + [this](const char* leftRow, const char* rightRow) { + return rows_->compareRows(leftRow, rightRow) < 0; + }); + } else if ( + reclusterConfig_.reclusterMode == + HashTableReclusterConfig::ReclusterMode::kHash) { + std::vector rowHashes(numRows); + folly::Range rowRange(sortedRows.data(), numRows); + + for (size_t i = 0; i < rows_->keyTypes().size(); ++i) { + bool mix = (i > 0); + rows_->hash(i, rowRange, mix, rowHashes.data()); + } + + folly::F14FastMap counts; + counts.reserve(4096); + for (uint64_t h : rowHashes) { + counts[h]++; + } + + folly::F14FastMap offsets; + offsets.reserve(counts.size()); + + size_t currentOffset = 0; + for (auto& kv : counts) { + offsets[kv.first] = currentOffset; + currentOffset += kv.second; + } + std::vector result(numRows); + + for (int32_t i = 0; i < numRows; ++i) { + uint64_t h = rowHashes[i]; + + size_t pos = offsets[h]++; + + result[pos] = sortedRows[i]; + } + sortedRows = std::move(result); + } else { + LOG(ERROR) << "reclusterDataByKey: unknown reclusterMode: " + << static_cast(reclusterConfig_.reclusterMode); + return; + } + + rows_ = std::move(rows_->cloneByOrder(sortedRows)); + if (table_ != nullptr) { + rows_->pool()->freeContiguous(tableAllocation_); + table_ = nullptr; + } + numTombstones_ = 0; + + for (size_t i = 0; i < otherTables_.size(); ++i) { + otherTables_[i]->reclusterDataByKey(); + } + capacity_ = bits::nextPowerOfTwo( + std::max(static_cast(numRows), kBucketSize)); + allocateTables(capacity_); + rehash(true); + sorted_ = true; +} + template void HashTable::prepareJoinTable( std::vector> tables, @@ -1852,6 +1987,9 @@ void HashTable::prepareJoinTable( } } else { decideHashMode(0); + if (hashMode_ == HashMode::kArray) { + tryRecluster(); + } } checkHashBitsOverlap(spillInputStartPartitionBit); LOG(INFO) << __FUNCTION__ << ": capacity_ = " << capacity_ diff --git a/bolt/exec/HashTable.h b/bolt/exec/HashTable.h index 17ab5407..41e22455 100644 --- a/bolt/exec/HashTable.h +++ b/bolt/exec/HashTable.h @@ -431,6 +431,14 @@ class BaseHashTable { return offThreadBuildTiming_; } + uint64_t numEstimatedProbeRows() const { + return numEstimatedProbeRows_; + } + + void setNumEstimatedProbeRows(uint64_t numEstimatedProbeRows) { + numEstimatedProbeRows_ = numEstimatedProbeRows; + } + protected: static FOLLY_ALWAYS_INLINE size_t tableSlotSize() { // Each slot is 8 bytes. @@ -458,6 +466,8 @@ class BaseHashTable { // Time spent in build outside of the calling thread. CpuWallTiming offThreadBuildTiming_; + + uint64_t numEstimatedProbeRows_ = 0; }; FOLLY_ALWAYS_INLINE std::ostream& operator<<( @@ -521,6 +531,62 @@ class ProbeState { uint8_t indexInTags_ = kNotSet; }; +struct HashTableReclusterConfig { + enum class ReclusterMode { kHash = 0, kSort = 1 }; + + int64_t duplicateRatioThreshold = 128; + int64_t minProbeRowNumber = 500000; + int64_t minDistinctRowNumber = 32; + ReclusterMode reclusterMode = ReclusterMode::kHash; + bool enableArrayRecluster = false; + + HashTableReclusterConfig() = default; + HashTableReclusterConfig( + int64_t duplicateRatioThreshold, + int64_t minProbeRowNumber, + int64_t minDistinctRowNumber, + ReclusterMode reclusterMode, + bool enableArrayRecluster) + : duplicateRatioThreshold(duplicateRatioThreshold), + minProbeRowNumber(minProbeRowNumber), + minDistinctRowNumber(minDistinctRowNumber), + reclusterMode(reclusterMode), + enableArrayRecluster(enableArrayRecluster) {} + HashTableReclusterConfig( + int64_t duplicateRatioThreshold, + int64_t minProbeRowNumber, + int64_t minDistinctRowNumber, + const std::string& reclusterMod, + bool enableArrayRecluster) + : duplicateRatioThreshold(duplicateRatioThreshold), + minProbeRowNumber(minProbeRowNumber), + minDistinctRowNumber(minDistinctRowNumber), + reclusterMode(parseReclusterMode(reclusterMod)), + enableArrayRecluster(enableArrayRecluster) {} + + static ReclusterMode parseReclusterMode(const std::string& mode) { + if (mode == "hash") { + return ReclusterMode::kHash; + } else if (mode == "sort") { + return ReclusterMode::kSort; + } else { + BOLT_FAIL("Unknown hash join array recluster mode: {}", mode); + } + } + static std::string modeString(ReclusterMode mode) { + switch (mode) { + case ReclusterMode::kHash: + return "hash"; + case ReclusterMode::kSort: + return "sort"; + default: + BOLT_FAIL( + "Unknown hash join array recluster mode: {}", + static_cast(mode)); + } + } +}; + template class HashTable : public BaseHashTable { public: @@ -586,8 +652,9 @@ class HashTable : public BaseHashTable { HashMode mode, uint32_t minTableSizeForParallelJoinBuild, memory::MemoryPool* pool, - bool jitRowEqVectors) { - return std::make_unique( + bool jitRowEqVectors, + const HashTableReclusterConfig& reclusterConfig = {}) { + auto hashTable = std::make_unique( std::move(hashers), std::vector{}, dependentTypes, @@ -599,6 +666,8 @@ class HashTable : public BaseHashTable { pool, nullptr, jitRowEqVectors); + hashTable->reclusterConfig_ = reclusterConfig; + return hashTable; } void groupProbe(HashLookup& lookup) override; @@ -690,6 +759,10 @@ class HashTable : public BaseHashTable { return hashMode_; } + void reclusterDataByKey(); + + void tryRecluster(); + void decideHashMode(int32_t numNew, bool disableRangeArrayHash = false) override; @@ -1157,8 +1230,9 @@ class HashTable : public BaseHashTable { #ifdef ENABLE_BOLT_JIT bolt::jit::CompiledModuleSP jitModule_; bolt::jit::CompiledModuleSP jitModuleRow_; - #endif + bool sorted_ = false; + HashTableReclusterConfig reclusterConfig_{}; }; } // namespace exec diff --git a/bolt/exec/Operator.cpp b/bolt/exec/Operator.cpp index 834f85e9..8c781960 100644 --- a/bolt/exec/Operator.cpp +++ b/bolt/exec/Operator.cpp @@ -346,6 +346,41 @@ void OperatorCtx::traverseOpToGetRowCount( } } +void OperatorCtx::traverseOpToGetRowCount(uint64_t& totalRowCount) const { + auto numDrivers = task()->numDrivers(driverCtx()); + + if (numDrivers == 1) { + const auto& operators = driver()->operators(); + + VLOG(5) << "operators.size()=" << operators.size() + << ", operatorId=" << operatorId(); + + for (auto i = operatorId() - 1; i >= 0; --i) { + auto metricValueStr = operators[i]->getRuntimeMetric( + OperatorMetricKey::kCanUsedToEstimateHashBuildPartitionNum, "false"); + auto metricValue = folly::to(metricValueStr); + + VLOG(5) << "OperatorIndex=" << i << ", operator is " + << operators[i]->toString() + << ", kCanUsedToEstimateHashBuildPartitionNum=" + << (metricValue ? "true" : "false"); + + if (metricValue) { + auto totalRowCountStr = + operators[i]->getRuntimeMetric(OperatorMetricKey::kTotalRowCount); + + BOLT_CHECK_NE(totalRowCountStr, "", "totalRowCountStr can't be empty") + + totalRowCount = folly::to(totalRowCountStr); + + LOG(INFO) << toString() << " totalRowCountStr = " << totalRowCountStr + << ", numDrivers = " << numDrivers; + break; + } + } + } +} + void OperatorCtx::adjustSpillCompressionKind( common::SpillConfig*& spillConfig) { if (!isFirstSpill_) { diff --git a/bolt/exec/Operator.h b/bolt/exec/Operator.h index f13a5562..a07f14b5 100644 --- a/bolt/exec/Operator.h +++ b/bolt/exec/Operator.h @@ -121,6 +121,8 @@ class OperatorCtx { uint64_t& totalRowCount, uint64_t& processedRowCount); + void traverseOpToGetRowCount(uint64_t& totalRowCount) const; + /// adjust SpillCompressionKind if estimatedSpillSize too large void adjustSpillCompressionKind(common::SpillConfig*& spillConfig); diff --git a/bolt/exec/RowContainer.cpp b/bolt/exec/RowContainer.cpp index 8ee26890..292c0d88 100644 --- a/bolt/exec/RowContainer.cpp +++ b/bolt/exec/RowContainer.cpp @@ -338,6 +338,110 @@ RowContainer::~RowContainer() { clear(); } +std::unique_ptr RowContainer::cloneByOrder( + const std::vector& sortedRows, + memory::MemoryPool* pool, + std::shared_ptr stringAllocator) { + std::vector dependentTypes; + if (types_.size() > keyTypes_.size()) { + dependentTypes.reserve(types_.size() - keyTypes_.size()); + for (size_t i = keyTypes_.size(); i < types_.size(); ++i) { + dependentTypes.push_back(types_[i]); + } + } + + auto newContainer = std::make_unique( + keyTypes_, + nullableKeys_, + accumulators_, + dependentTypes, + nextOffset_ != 0, + isJoinBuild_, + probedFlagOffset_ != 0, + hasNormalizedKeys_, + pool == nullptr ? rows_.pool() : pool, + stringAllocator == nullptr ? pool == nullptr + ? std::make_shared(rows_.pool()) + : std::make_shared(pool) + : stringAllocator); + if (hasVariableAccumulator_) { + BOLT_CHECK( + !usesExternalMemory_, + "Direct copy with external memory accumulators is not fully supported in this optimized path."); + } + auto& targetStringAllocator = newContainer->stringAllocator(); + + for (char* sourceRow : sortedRows) { + BOLT_CHECK_NOT_NULL(sourceRow, "Source row cannot be null"); + char* targetRow = newContainer->newRow(); + + ::memcpy(targetRow, sourceRow, fixedRowSize_); + + if (normalizedKeySize_ > 0) { + RowContainer::normalizedKey(targetRow) = + RowContainer::normalizedKey(sourceRow); + } + + bits::clearBit(targetRow, newContainer->freeFlagOffset_); + + if (nextOffset_ != 0) { + *reinterpret_cast(targetRow + nextOffset_) = nullptr; + } + + if (rowSizeOffset_ != 0) { + *reinterpret_cast(targetRow + rowSizeOffset_) = 0; + } + + for (int i = 0; i < types_.size(); ++i) { + if (types_[i]->isFixedWidth()) { + continue; + } + + auto col = rowColumns_[i]; + if (isNullAt(sourceRow, col)) { + continue; + } + + auto typeKind = types_[i]->kind(); + + if (typeKind == TypeKind::ROW || typeKind == TypeKind::ARRAY || + typeKind == TypeKind::MAP) { + auto sourceView = valueAt(sourceRow, col.offset()); + if (!sourceView.empty()) { + RowSizeTracker tracker( + targetRow[rowSizeOffset_], targetStringAllocator); + targetStringAllocator.copyMultipart( + StringView(sourceView.data(), sourceView.size()), + targetRow, + col.offset()); + } + } else if ( + typeKind == TypeKind::VARCHAR || typeKind == TypeKind::VARBINARY) { + StringView sourceView = valueAt(sourceRow, col.offset()); + if (!sourceView.isInline()) { + RowSizeTracker tracker( + targetRow[rowSizeOffset_], targetStringAllocator); + targetStringAllocator.copyMultipart( + sourceView, targetRow, col.offset()); + } + } + } + + for (const auto& accumulator : accumulators_) { + if (accumulator.serializable()) { + uint32_t serializeSize = accumulator.getSerializeSize(sourceRow); + if (serializeSize > 0) { + std::vector buffer(serializeSize); + accumulator.serializeAccumulator(sourceRow, buffer.data()); + accumulator.deserializeAccumulator(targetRow, buffer.data()); + } + } + } + } + + return newContainer; +} + char* RowContainer::newRow() { BOLT_DCHECK(mutable_, "Can't add row into an immutable row container"); ++numRows_; diff --git a/bolt/exec/RowContainer.h b/bolt/exec/RowContainer.h index fa097dfa..ce7cd80b 100644 --- a/bolt/exec/RowContainer.h +++ b/bolt/exec/RowContainer.h @@ -235,6 +235,11 @@ class RowContainer { ~RowContainer(); + std::unique_ptr cloneByOrder( + const std::vector& rows, + memory::MemoryPool* pool = nullptr, + std::shared_ptr stringAllocator = nullptr); + static int32_t combineAlignments(int32_t a, int32_t b); /// 'keyTypes' gives the type of the key of each row. For a group by, diff --git a/bolt/exec/tests/HashTableTest.cpp b/bolt/exec/tests/HashTableTest.cpp index fb4a3fc1..633f845b 100644 --- a/bolt/exec/tests/HashTableTest.cpp +++ b/bolt/exec/tests/HashTableTest.cpp @@ -1230,4 +1230,73 @@ TEST(HashTableTest, tableInsertPartitionInfo) { ASSERT_EQ(overflows[i], info.overflows[i]); } } + +TEST_P(HashTableTest, reclusterDataByKey) { + std::vector dependentTypes = {BIGINT()}; + std::vector> hashers; + hashers.emplace_back(std::make_unique(BIGINT(), 0)); + + HashTableReclusterConfig config; + // Reusing 'enableRunParallel' to test different recluster modes (kSort vs + // kHash). + config.reclusterMode = GetParam().enableRunParallel + ? HashTableReclusterConfig::ReclusterMode::kSort + : HashTableReclusterConfig::ReclusterMode::kHash; + config.enableArrayRecluster = true; + config.duplicateRatioThreshold = 0; + config.minDistinctRowNumber = 0; + + auto hashTable = HashTable::createForJoin( + std::move(hashers), + dependentTypes, + true /*allowDuplicates*/, + false /*hasProbedFlag*/, + BaseHashTable::HashMode::kArray, + 1 /*minTableSizeForParallelJoinBuild*/, + pool(), + GetParam().jitRowEqVectors, + config); + const int32_t numRows = 4096; + const int32_t numDistinctKeys = 16; + auto keyVector = makeFlatVector( + numRows, [](auto row) { return (row % numDistinctKeys) + 1; }); + auto payloadVector = + makeFlatVector(numRows, [](auto row) { return row * 10; }); + + auto batch = makeRowVector({keyVector, payloadVector}); + copyVectorsToTable({batch}, 0, hashTable.get()); + hashTable->prepareJoinTable({}, executor_.get()); + auto& rows = *hashTable->rows(); + EXPECT_EQ(rows.numRows(), numRows); + + hashTable->reclusterDataByKey(); + + auto& newRows = *hashTable->rows(); + EXPECT_EQ(newRows.numRows(), numRows); + + int32_t keyOffset = newRows.columnAt(0).offset(); + RowContainerIterator iter; + std::vector resultKeys; + char* rowPtrs[1]; + + while (newRows.listRows(&iter, 1, rowPtrs) > 0) { + char* row = rowPtrs[0]; + resultKeys.push_back(newRows.valueAt(row, keyOffset)); + } + + ASSERT_EQ(resultKeys.size(), numRows); + std::unordered_set seenKeys(resultKeys.begin(), resultKeys.end()); + EXPECT_EQ(seenKeys.size(), numDistinctKeys); + int changeCount = 0; + for (size_t i = 1; i < resultKeys.size(); ++i) { + if (resultKeys[i] != resultKeys[i - 1]) { + changeCount++; + } + } + EXPECT_EQ(changeCount, numDistinctKeys - 1) + << "Data should be clustered into " << numDistinctKeys + << " groups, so there should be exactly " << numDistinctKeys - 1 + << " transitions."; +} + } // namespace bytedance::bolt::exec::test diff --git a/bolt/exec/tests/RowContainerTest.cpp b/bolt/exec/tests/RowContainerTest.cpp index b8d6f55f..71630626 100644 --- a/bolt/exec/tests/RowContainerTest.cpp +++ b/bolt/exec/tests/RowContainerTest.cpp @@ -1747,6 +1747,68 @@ DEBUG_ONLY_TEST_F(RowContainerTest, eraseAfterOomStoringString) { rowContainer->eraseRows(folly::Range(rows.data(), numRows)); } +TEST_F(RowContainerTest, cloneByOrder) { + std::vector keyTypes = {BIGINT()}; + std::vector dependentTypes = {VARCHAR()}; + + auto data = makeRowContainer(keyTypes, dependentTypes); + auto keyOffset = data->columnAt(0).offset(); + auto storeString = [&](char* row, const std::string& s) { + auto vector = makeFlatVector({StringView(s)}); + DecodedVector decoded(*vector); + data->store(decoded, 0, row, 1); + }; + // Insert 3 rows: (1, "a"), (2, "b"), (3, "c") + // Row 1 + auto* row1 = data->newRow(); + data->valueAt(row1, keyOffset) = 1; + std::string str1 = "value_a_long_string"; + storeString(row1, str1); + + // Row 2 + auto* row2 = data->newRow(); + data->valueAt(row2, keyOffset) = 2; + std::string str2 = "value_b"; + storeString(row2, str2); + + // Row 3 + auto* row3 = data->newRow(); + data->valueAt(row3, keyOffset) = 3; + std::string str3 = "value_c"; + storeString(row3, str3); + + EXPECT_EQ(data->numRows(), 3); + + // Create a vector of rows in reverse order: 3, 2, 1 + std::vector sortedRows = {row3, row2, row1}; + + auto newData = data->cloneByOrder(sortedRows); + + EXPECT_EQ(newData->numRows(), 3); + EXPECT_NE(newData.get(), data.get()); + + int64_t expectedKeys[] = {3, 2, 1}; + std::string expectedVals[] = {str3, str2, str1}; + + std::vector clonedRows; + clonedRows.resize(3); + RowContainerIterator iter; + auto numRows = newData->listRows(&iter, 3, clonedRows.data()); + EXPECT_EQ(numRows, 3); + int counter = 0; + auto newKeyOffset = newData->columnAt(0).offset(); + auto newStringOffset = newData->columnAt(1).offset(); + + for (auto row : clonedRows) { + int64_t key = newData->valueAt(row, newKeyOffset); + EXPECT_EQ(key, expectedKeys[counter]); + + auto val = newData->valueAt(row, newStringOffset); + EXPECT_EQ(std::string(val), expectedVals[counter]); + ++counter; + } +} + TEST_F(RowContainerTest, DISABLED_ConvertBenchmark) { VectorFuzzer fuzzer( {.vectorSize = 100000, .nullRatio = 0.1, .containerLength = 10}, pool());