diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md
index f1b6aeaf..d44968eb 100644
--- a/.github/pull_request_template.md
+++ b/.github/pull_request_template.md
@@ -27,15 +27,17 @@ Please verify that your change does not introduce performance regressions.
-->
- [ ] **No Impact**: This change does not affect the critical path (e.g., build system, doc, error handling).
- [ ] **Positive Impact**: I have run benchmarks.
-
- Click to view Benchmark Results
+
+Click to view Benchmark Results
+
+```text
+Paste your google-benchmark or TPC-H results here.
+Before: 10.5s
+After: 8.2s (+20%)
+```
+
+
- ```text
- Paste your google-benchmark or TPC-H results here.
- Before: 10.5s
- After: 8.2s (+20%)
- ```
-
- [ ] **Negative Impact**: Explained below (e.g., trade-off for correctness).
### Release Note
@@ -70,13 +72,12 @@ If yes, please describe how users should migrate.
- [ ] No
- [ ] Yes (Description: ...)
-
- Click to view Breaking Changes
-
- ```text
- Breaking Changes:
- - Description of the breaking change.
- - Possible solutions or workarounds.
- - Any other relevant information.
- ```
-
+
+Click to view Breaking Changes
+```text
+Breaking Changes:
+- Description of the breaking change.
+- Possible solutions or workarounds.
+- Any other relevant information.
+```
+
diff --git a/Makefile b/Makefile
index 0f8592a0..cd8a4f69 100644
--- a/Makefile
+++ b/Makefile
@@ -348,6 +348,17 @@ unittest_coverage: debug_with_test_cov #: Build with debugging and run unit tes
lcov --remove coverage.info '/usr/*' '*/.conan/data/*' '*/_build/*' '*/tests/*' '*/test/*' --output-file coverage_striped.info && \
genhtml --ignore-errors source coverage_striped.info --output-directory coverage
+unittest_single:
+ifndef TARGET
+ $(error TARGET is undefined. Usage: make unittest_single TARGET=TargetName [TEST=TestFilter])
+endif
+ cmake --build $(BUILD_BASE_DIR)/Release --target $(TARGET) -j $(NUM_THREADS)
+ifneq ($(TEST),)
+ export GTEST_FILTER="$(TEST)" && ctest --test-dir $(BUILD_BASE_DIR)/Release -R "$(TARGET)" --output-on-failure --timeout 7200
+else
+ ctest --test-dir $(BUILD_BASE_DIR)/Release -R "$(TARGET)" --output-on-failure --timeout 7200
+endif
+
hdfstest: hdfs-debug-build #: Build with debugging, hdfs enabled and run hdfs tests
ctest --test-dir $(BUILD_BASE_DIR)/Debug -j ${NUM_THREADS} --output-on-failure -R bolt_hdfs_file_test
diff --git a/bolt/connectors/Connector.h b/bolt/connectors/Connector.h
index 510628df..b1f91e98 100644
--- a/bolt/connectors/Connector.h
+++ b/bolt/connectors/Connector.h
@@ -412,7 +412,7 @@ class AsyncThreadCtx {
void disallowPreload() {
if (adaptive_ && allowPreload_.load()) {
allowPreload_ = false;
- LOG(WARNING) << "Disallow scan preload due to limited memory";
+ LOG(INFO) << "Disallow scan preload due to limited memory";
}
}
diff --git a/bolt/core/QueryConfig.h b/bolt/core/QueryConfig.h
index af9be731..39c4ef11 100644
--- a/bolt/core/QueryConfig.h
+++ b/bolt/core/QueryConfig.h
@@ -652,6 +652,21 @@ class QueryConfig {
static constexpr const char* kSkewRowCountRatioThreshold =
"hash_join_skewed_row_count_ratio";
+ static constexpr const char* kEnableHashJoinArrayRecluster =
+ "enable_hash_join_array_recluster";
+
+ static constexpr const char* kHashJoinArrayReclusterMode =
+ "hash_join_array_recluster_mode";
+
+ static constexpr const char* kHashJoinArrayReclusterDuplicateRatioThreshold =
+ "hash_join_array_recluster_duplicate_ratio_threshold";
+
+ static constexpr const char* kHashJoinArrayReclusterMinProbeRowNumber =
+ "hash_join_array_recluster_min_probe_row_number";
+
+ static constexpr const char* kHashJoinArrayReclusterMinDistinctRowNumber =
+ "hash_join_array_recluster_min_distinct_row_number";
+
// -1 means print all exceptions, usualing for debug
// 0 means disable all exceptions,
// 1 means print exceptions whose prefix is in the white list(default)
@@ -1490,6 +1505,26 @@ class QueryConfig {
return get(kHashJoinSkewedPartitionEnabled, true);
}
+ bool hashJoinArrayReclusterEnabled() const {
+ return get(kEnableHashJoinArrayRecluster, true);
+ }
+
+ std::string hashJoinArrayReclusterMode() const {
+ return get(kHashJoinArrayReclusterMode, "hash");
+ }
+
+ int64_t hashJoinArrayReclusterDuplicateRatioThreshold() const {
+ return get(kHashJoinArrayReclusterDuplicateRatioThreshold, 128);
+ }
+
+ int64_t hashJoinArrayReclusterMinProbeRowNumber() const {
+ return get(kHashJoinArrayReclusterMinProbeRowNumber, 500000);
+ }
+
+ int64_t hashJoinArrayReclusterMinDistinctRowNumber() const {
+ return get(kHashJoinArrayReclusterMinDistinctRowNumber, 32);
+ }
+
int32_t skewFileSizeRatioThreshold() const {
return get(kSkewFileSizeRatioThreshold, 10);
}
diff --git a/bolt/exec/HashBuild.cpp b/bolt/exec/HashBuild.cpp
index 2ad5df41..b34ff24e 100644
--- a/bolt/exec/HashBuild.cpp
+++ b/bolt/exec/HashBuild.cpp
@@ -183,6 +183,7 @@ void HashBuild::initialize() {
if (isAntiJoin(joinType_) && joinNode_->filter()) {
setupFilterForAntiJoins(keyChannelMap_);
}
+ table_->setNumEstimatedProbeRows(joinBridge_->numEstimatedProbeRows());
}
void HashBuild::setupTable() {
@@ -203,6 +204,12 @@ void HashBuild::setupTable() {
dependentTypes.emplace_back(tableType_->childAt(i));
}
auto& queryConfig = operatorCtx_->driverCtx()->queryConfig();
+ HashTableReclusterConfig hashTableReclusterConfig(
+ queryConfig.hashJoinArrayReclusterDuplicateRatioThreshold(),
+ queryConfig.hashJoinArrayReclusterMinProbeRowNumber(),
+ queryConfig.hashJoinArrayReclusterMinDistinctRowNumber(),
+ queryConfig.hashJoinArrayReclusterMode(),
+ queryConfig.hashJoinArrayReclusterEnabled());
if (joinNode_->isRightJoin() || joinNode_->isFullJoin() ||
joinNode_->isRightSemiProjectJoin()) {
// Do not ignore null keys.
@@ -215,7 +222,8 @@ void HashBuild::setupTable() {
: BaseHashTable::HashMode::kArray,
queryConfig.minTableRowsForParallelJoinBuild(),
pool(),
- queryConfig.enableJitRowEqVectors());
+ queryConfig.enableJitRowEqVectors(),
+ hashTableReclusterConfig);
} else {
// Right semi join needs to tag build rows that were probed.
const bool needProbedFlag = joinNode_->isRightSemiFilterJoin();
@@ -231,7 +239,8 @@ void HashBuild::setupTable() {
: BaseHashTable::HashMode::kArray,
queryConfig.minTableRowsForParallelJoinBuild(),
pool(),
- queryConfig.enableJitRowEqVectors());
+ queryConfig.enableJitRowEqVectors(),
+ hashTableReclusterConfig);
} else {
// Ignore null keys
table_ = HashTable::createForJoin(
@@ -243,7 +252,8 @@ void HashBuild::setupTable() {
: BaseHashTable::HashMode::kArray,
queryConfig.minTableRowsForParallelJoinBuild(),
pool(),
- queryConfig.enableJitRowEqVectors());
+ queryConfig.enableJitRowEqVectors(),
+ hashTableReclusterConfig);
}
}
lookup_ = std::make_unique(
diff --git a/bolt/exec/HashJoinBridge.h b/bolt/exec/HashJoinBridge.h
index cf2f8b71..c057fd6f 100644
--- a/bolt/exec/HashJoinBridge.h
+++ b/bolt/exec/HashJoinBridge.h
@@ -134,6 +134,14 @@ class HashJoinBridge : public JoinBridge {
}
}
+ uint64_t numEstimatedProbeRows() const {
+ return numEstimatedProbeRows_;
+ }
+
+ void setNumEstimatedProbeRows(uint64_t numEstimatedProbeRows) {
+ numEstimatedProbeRows_ = numEstimatedProbeRows;
+ }
+
private:
uint32_t numBuilders_{0};
@@ -158,6 +166,8 @@ class HashJoinBridge : public JoinBridge {
// This set can grow if HashBuild operator cannot load full partition in
// memory and engages in recursive spilling.
SpillPartitionSet spillPartitionSets_;
+
+ uint64_t numEstimatedProbeRows_{0};
};
// Indicates if 'joinNode' is null-aware anti or left semi project join type and
diff --git a/bolt/exec/HashProbe.cpp b/bolt/exec/HashProbe.cpp
index 197afb87..ff0a798c 100644
--- a/bolt/exec/HashProbe.cpp
+++ b/bolt/exec/HashProbe.cpp
@@ -158,6 +158,9 @@ HashProbe::HashProbe(
void HashProbe::initialize() {
Operator::initialize();
+ uint64_t totalRowCnt{0};
+ operatorCtx_->traverseOpToGetRowCount(totalRowCnt);
+ joinBridge_->setNumEstimatedProbeRows(totalRowCnt);
auto jitRowEqVectors =
operatorCtx_->driverCtx()->queryConfig().enableJitRowEqVectors();
BOLT_CHECK(hashers_.empty());
diff --git a/bolt/exec/HashTable.cpp b/bolt/exec/HashTable.cpp
index 10a17207..7849f57b 100644
--- a/bolt/exec/HashTable.cpp
+++ b/bolt/exec/HashTable.cpp
@@ -820,7 +820,12 @@ void HashTable::allocateTables(uint64_t size) {
BOLT_CHECK_GT(size, 0);
capacity_ = size;
const uint64_t byteSize = capacity_ * tableSlotSize();
- BOLT_CHECK_EQ(byteSize % kBucketSize, 0);
+ BOLT_CHECK_EQ(
+ byteSize % kBucketSize,
+ 0,
+ "byteSize: {}, kBucketSize: {}, ",
+ byteSize,
+ kBucketSize);
numTombstones_ = 0;
sizeMask_ = byteSize - 1;
numBuckets_ = byteSize / kBucketSize;
@@ -1550,6 +1555,35 @@ void HashTable::clearUseRange(std::vector& useRange) {
}
}
+template
+void HashTable::tryRecluster() {
+ if (!reclusterConfig_.enableArrayRecluster) {
+ return;
+ }
+
+ // note that maxDistinctNumber is the maximum number of distinct values
+ // in all hashers. But numDistinct_ is not DISTINCT number of values in
+ // the row container.
+
+ if (numDistinct_ >= numEstimatedProbeRows_ ||
+ numEstimatedProbeRows_ < reclusterConfig_.minProbeRowNumber) {
+ return;
+ }
+ size_t maxDistinctNumber = 0;
+
+ for (auto& hasher : hashers_) {
+ maxDistinctNumber = std::max(maxDistinctNumber, hasher->numUniqueValues());
+ }
+ int64_t duplicateRatio =
+ maxDistinctNumber > 0 ? numDistinct_ / maxDistinctNumber : 0;
+ if (duplicateRatio < reclusterConfig_.duplicateRatioThreshold ||
+ maxDistinctNumber < reclusterConfig_.minDistinctRowNumber) {
+ return;
+ }
+
+ reclusterDataByKey();
+}
+
template
void HashTable::decideHashMode(
int32_t numNew,
@@ -1775,6 +1809,107 @@ bool mayUseValueIds(const BaseHashTable& table) {
}
} // namespace
+template
+void HashTable::reclusterDataByKey() {
+ if (!isJoinBuild_) {
+ LOG(INFO) << "reclusterDataByKey: joinBuild_ is false";
+ return;
+ }
+ if (rows_->numRows() == 0) {
+ LOG(INFO) << "reclusterDataByKey: numRows is 0";
+ return;
+ }
+ if (rows_->keyTypes().empty()) {
+ LOG(INFO) << "reclusterDataByKey: keyTypes is empty";
+ return;
+ }
+ if (rows_->accumulators().size() > 0) {
+ LOG(INFO) << "reclusterDataByKey: accumulators is not empty";
+ return;
+ }
+ if (sorted_) {
+ LOG(INFO) << "reclusterDataByKey: sorted_ is true";
+ return;
+ }
+
+ if (hashMode_ != HashMode::kArray) {
+ LOG(INFO) << "reclusterDataByKey: hashMode_ is not kArray";
+ return;
+ }
+
+ auto numRows = rows_->numRows();
+ std::vector sortedRows(numRows);
+
+ RowContainerIterator iter;
+ rows_->listRows(&iter, numRows, sortedRows.data());
+
+ if (reclusterConfig_.reclusterMode ==
+ HashTableReclusterConfig::ReclusterMode::kSort) {
+ HybridSorter sorter{SortAlgo::kAuto};
+ sorter.template sort(
+ sortedRows.begin(),
+ sortedRows.end(),
+ [this](const char* leftRow, const char* rightRow) {
+ return rows_->compareRows(leftRow, rightRow) < 0;
+ });
+ } else if (
+ reclusterConfig_.reclusterMode ==
+ HashTableReclusterConfig::ReclusterMode::kHash) {
+ std::vector rowHashes(numRows);
+ folly::Range rowRange(sortedRows.data(), numRows);
+
+ for (size_t i = 0; i < rows_->keyTypes().size(); ++i) {
+ bool mix = (i > 0);
+ rows_->hash(i, rowRange, mix, rowHashes.data());
+ }
+
+ folly::F14FastMap counts;
+ counts.reserve(4096);
+ for (uint64_t h : rowHashes) {
+ counts[h]++;
+ }
+
+ folly::F14FastMap offsets;
+ offsets.reserve(counts.size());
+
+ size_t currentOffset = 0;
+ for (auto& kv : counts) {
+ offsets[kv.first] = currentOffset;
+ currentOffset += kv.second;
+ }
+ std::vector result(numRows);
+
+ for (int32_t i = 0; i < numRows; ++i) {
+ uint64_t h = rowHashes[i];
+
+ size_t pos = offsets[h]++;
+
+ result[pos] = sortedRows[i];
+ }
+ sortedRows = std::move(result);
+ } else {
+ LOG(ERROR) << "reclusterDataByKey: unknown reclusterMode: "
+ << static_cast(reclusterConfig_.reclusterMode);
+ return;
+ }
+
+ rows_ = std::move(rows_->cloneByOrder(sortedRows));
+ if (table_ != nullptr) {
+ rows_->pool()->freeContiguous(tableAllocation_);
+ table_ = nullptr;
+ }
+ numTombstones_ = 0;
+
+ for (size_t i = 0; i < otherTables_.size(); ++i) {
+ otherTables_[i]->reclusterDataByKey();
+ }
+ capacity_ = bits::nextPowerOfTwo(
+ std::max(static_cast(numRows), kBucketSize));
+ allocateTables(capacity_);
+ rehash(true);
+ sorted_ = true;
+}
+
template
void HashTable::prepareJoinTable(
std::vector> tables,
@@ -1852,6 +1987,9 @@ void HashTable::prepareJoinTable(
}
} else {
decideHashMode(0);
+ if (hashMode_ == HashMode::kArray) {
+ tryRecluster();
+ }
}
checkHashBitsOverlap(spillInputStartPartitionBit);
LOG(INFO) << __FUNCTION__ << ": capacity_ = " << capacity_
diff --git a/bolt/exec/HashTable.h b/bolt/exec/HashTable.h
index 17ab5407..41e22455 100644
--- a/bolt/exec/HashTable.h
+++ b/bolt/exec/HashTable.h
@@ -431,6 +431,14 @@ class BaseHashTable {
return offThreadBuildTiming_;
}
+ uint64_t numEstimatedProbeRows() const {
+ return numEstimatedProbeRows_;
+ }
+
+ void setNumEstimatedProbeRows(uint64_t numEstimatedProbeRows) {
+ numEstimatedProbeRows_ = numEstimatedProbeRows;
+ }
+
protected:
static FOLLY_ALWAYS_INLINE size_t tableSlotSize() {
// Each slot is 8 bytes.
@@ -458,6 +466,8 @@ class BaseHashTable {
// Time spent in build outside of the calling thread.
CpuWallTiming offThreadBuildTiming_;
+
+ uint64_t numEstimatedProbeRows_ = 0;
};
FOLLY_ALWAYS_INLINE std::ostream& operator<<(
@@ -521,6 +531,62 @@ class ProbeState {
uint8_t indexInTags_ = kNotSet;
};
+struct HashTableReclusterConfig {
+ enum class ReclusterMode { kHash = 0, kSort = 1 };
+
+ int64_t duplicateRatioThreshold = 128;
+ int64_t minProbeRowNumber = 500000;
+ int64_t minDistinctRowNumber = 32;
+ ReclusterMode reclusterMode = ReclusterMode::kHash;
+ bool enableArrayRecluster = false;
+
+ HashTableReclusterConfig() = default;
+ HashTableReclusterConfig(
+ int64_t duplicateRatioThreshold,
+ int64_t minProbeRowNumber,
+ int64_t minDistinctRowNumber,
+ ReclusterMode reclusterMode,
+ bool enableArrayRecluster)
+ : duplicateRatioThreshold(duplicateRatioThreshold),
+ minProbeRowNumber(minProbeRowNumber),
+ minDistinctRowNumber(minDistinctRowNumber),
+ reclusterMode(reclusterMode),
+ enableArrayRecluster(enableArrayRecluster) {}
+ HashTableReclusterConfig(
+ int64_t duplicateRatioThreshold,
+ int64_t minProbeRowNumber,
+ int64_t minDistinctRowNumber,
+ const std::string& reclusterMod,
+ bool enableArrayRecluster)
+ : duplicateRatioThreshold(duplicateRatioThreshold),
+ minProbeRowNumber(minProbeRowNumber),
+ minDistinctRowNumber(minDistinctRowNumber),
+ reclusterMode(parseReclusterMode(reclusterMod)),
+ enableArrayRecluster(enableArrayRecluster) {}
+
+ static ReclusterMode parseReclusterMode(const std::string& mode) {
+ if (mode == "hash") {
+ return ReclusterMode::kHash;
+ } else if (mode == "sort") {
+ return ReclusterMode::kSort;
+ } else {
+ BOLT_FAIL("Unknown hash join array recluster mode: {}", mode);
+ }
+ }
+ static std::string modeString(ReclusterMode mode) {
+ switch (mode) {
+ case ReclusterMode::kHash:
+ return "hash";
+ case ReclusterMode::kSort:
+ return "sort";
+ default:
+ BOLT_FAIL(
+ "Unknown hash join array recluster mode: {}",
+ static_cast(mode));
+ }
+ }
+};
+
template
class HashTable : public BaseHashTable {
public:
@@ -586,8 +652,9 @@ class HashTable : public BaseHashTable {
HashMode mode,
uint32_t minTableSizeForParallelJoinBuild,
memory::MemoryPool* pool,
- bool jitRowEqVectors) {
- return std::make_unique(
+ bool jitRowEqVectors,
+ const HashTableReclusterConfig& reclusterConfig = {}) {
+ auto hashTable = std::make_unique(
std::move(hashers),
std::vector{},
dependentTypes,
@@ -599,6 +666,8 @@ class HashTable : public BaseHashTable {
pool,
nullptr,
jitRowEqVectors);
+ hashTable->reclusterConfig_ = reclusterConfig;
+ return hashTable;
}
void groupProbe(HashLookup& lookup) override;
@@ -690,6 +759,10 @@ class HashTable : public BaseHashTable {
return hashMode_;
}
+ void reclusterDataByKey();
+
+ void tryRecluster();
+
void decideHashMode(int32_t numNew, bool disableRangeArrayHash = false)
override;
@@ -1157,8 +1230,9 @@ class HashTable : public BaseHashTable {
#ifdef ENABLE_BOLT_JIT
bolt::jit::CompiledModuleSP jitModule_;
bolt::jit::CompiledModuleSP jitModuleRow_;
-
#endif
+ bool sorted_ = false;
+ HashTableReclusterConfig reclusterConfig_{};
};
} // namespace exec
diff --git a/bolt/exec/Operator.cpp b/bolt/exec/Operator.cpp
index 834f85e9..8c781960 100644
--- a/bolt/exec/Operator.cpp
+++ b/bolt/exec/Operator.cpp
@@ -346,6 +346,41 @@ void OperatorCtx::traverseOpToGetRowCount(
}
}
+void OperatorCtx::traverseOpToGetRowCount(uint64_t& totalRowCount) const {
+ auto numDrivers = task()->numDrivers(driverCtx());
+
+ if (numDrivers == 1) {
+ const auto& operators = driver()->operators();
+
+ VLOG(5) << "operators.size()=" << operators.size()
+ << ", operatorId=" << operatorId();
+
+ for (auto i = operatorId() - 1; i >= 0; --i) {
+ auto metricValueStr = operators[i]->getRuntimeMetric(
+ OperatorMetricKey::kCanUsedToEstimateHashBuildPartitionNum, "false");
+ auto metricValue = folly::to(metricValueStr);
+
+ VLOG(5) << "OperatorIndex=" << i << ", operator is "
+ << operators[i]->toString()
+ << ", kCanUsedToEstimateHashBuildPartitionNum="
+ << (metricValue ? "true" : "false");
+
+ if (metricValue) {
+ auto totalRowCountStr =
+ operators[i]->getRuntimeMetric(OperatorMetricKey::kTotalRowCount);
+
+ BOLT_CHECK_NE(totalRowCountStr, "", "totalRowCountStr can't be empty")
+
+ totalRowCount = folly::to(totalRowCountStr);
+
+ LOG(INFO) << toString() << " totalRowCountStr = " << totalRowCountStr
+ << ", numDrivers = " << numDrivers;
+ break;
+ }
+ }
+ }
+}
+
void OperatorCtx::adjustSpillCompressionKind(
common::SpillConfig*& spillConfig) {
if (!isFirstSpill_) {
diff --git a/bolt/exec/Operator.h b/bolt/exec/Operator.h
index f13a5562..a07f14b5 100644
--- a/bolt/exec/Operator.h
+++ b/bolt/exec/Operator.h
@@ -121,6 +121,8 @@ class OperatorCtx {
uint64_t& totalRowCount,
uint64_t& processedRowCount);
+ void traverseOpToGetRowCount(uint64_t& totalRowCount) const;
+
/// adjust SpillCompressionKind if estimatedSpillSize too large
void adjustSpillCompressionKind(common::SpillConfig*& spillConfig);
diff --git a/bolt/exec/RowContainer.cpp b/bolt/exec/RowContainer.cpp
index 8ee26890..292c0d88 100644
--- a/bolt/exec/RowContainer.cpp
+++ b/bolt/exec/RowContainer.cpp
@@ -338,6 +338,110 @@ RowContainer::~RowContainer() {
clear();
}
+std::unique_ptr RowContainer::cloneByOrder(
+ const std::vector& sortedRows,
+ memory::MemoryPool* pool,
+ std::shared_ptr stringAllocator) {
+ std::vector dependentTypes;
+ if (types_.size() > keyTypes_.size()) {
+ dependentTypes.reserve(types_.size() - keyTypes_.size());
+ for (size_t i = keyTypes_.size(); i < types_.size(); ++i) {
+ dependentTypes.push_back(types_[i]);
+ }
+ }
+
+ auto newContainer = std::make_unique(
+ keyTypes_,
+ nullableKeys_,
+ accumulators_,
+ dependentTypes,
+ nextOffset_ != 0,
+ isJoinBuild_,
+ probedFlagOffset_ != 0,
+ hasNormalizedKeys_,
+ pool == nullptr ? rows_.pool() : pool,
+ stringAllocator == nullptr ? pool == nullptr
+ ? std::make_shared(rows_.pool())
+ : std::make_shared(pool)
+ : stringAllocator);
+ if (hasVariableAccumulator_) {
+ BOLT_CHECK(
+ !usesExternalMemory_,
+ "Direct copy with external memory accumulators is not fully supported in this optimized path.");
+ }
+ auto& targetStringAllocator = newContainer->stringAllocator();
+
+ for (char* sourceRow : sortedRows) {
+ BOLT_CHECK_NOT_NULL(sourceRow, "Source row cannot be null");
+ char* targetRow = newContainer->newRow();
+
+ ::memcpy(targetRow, sourceRow, fixedRowSize_);
+
+ if (normalizedKeySize_ > 0) {
+ RowContainer::normalizedKey(targetRow) =
+ RowContainer::normalizedKey(sourceRow);
+ }
+
+ bits::clearBit(targetRow, newContainer->freeFlagOffset_);
+
+ if (nextOffset_ != 0) {
+ *reinterpret_cast(targetRow + nextOffset_) = nullptr;
+ }
+
+ if (rowSizeOffset_ != 0) {
+ *reinterpret_cast(targetRow + rowSizeOffset_) = 0;
+ }
+
+ for (int i = 0; i < types_.size(); ++i) {
+ if (types_[i]->isFixedWidth()) {
+ continue;
+ }
+
+ auto col = rowColumns_[i];
+ if (isNullAt(sourceRow, col)) {
+ continue;
+ }
+
+ auto typeKind = types_[i]->kind();
+
+ if (typeKind == TypeKind::ROW || typeKind == TypeKind::ARRAY ||
+ typeKind == TypeKind::MAP) {
+ auto sourceView = valueAt(sourceRow, col.offset());
+ if (!sourceView.empty()) {
+ RowSizeTracker tracker(
+ targetRow[rowSizeOffset_], targetStringAllocator);
+ targetStringAllocator.copyMultipart(
+ StringView(sourceView.data(), sourceView.size()),
+ targetRow,
+ col.offset());
+ }
+ } else if (
+ typeKind == TypeKind::VARCHAR || typeKind == TypeKind::VARBINARY) {
+ StringView sourceView = valueAt(sourceRow, col.offset());
+ if (!sourceView.isInline()) {
+ RowSizeTracker tracker(
+ targetRow[rowSizeOffset_], targetStringAllocator);
+ targetStringAllocator.copyMultipart(
+ sourceView, targetRow, col.offset());
+ }
+ }
+ }
+
+ for (const auto& accumulator : accumulators_) {
+ if (accumulator.serializable()) {
+ uint32_t serializeSize = accumulator.getSerializeSize(sourceRow);
+ if (serializeSize > 0) {
+ std::vector buffer(serializeSize);
+ accumulator.serializeAccumulator(sourceRow, buffer.data());
+ accumulator.deserializeAccumulator(targetRow, buffer.data());
+ }
+ }
+ }
+ }
+
+ return newContainer;
+}
+
char* RowContainer::newRow() {
BOLT_DCHECK(mutable_, "Can't add row into an immutable row container");
++numRows_;
diff --git a/bolt/exec/RowContainer.h b/bolt/exec/RowContainer.h
index fa097dfa..ce7cd80b 100644
--- a/bolt/exec/RowContainer.h
+++ b/bolt/exec/RowContainer.h
@@ -235,6 +235,11 @@ class RowContainer {
~RowContainer();
+ std::unique_ptr cloneByOrder(
+ const std::vector& rows,
+ memory::MemoryPool* pool = nullptr,
+ std::shared_ptr stringAllocator = nullptr);
+
static int32_t combineAlignments(int32_t a, int32_t b);
/// 'keyTypes' gives the type of the key of each row. For a group by,
diff --git a/bolt/exec/tests/HashTableTest.cpp b/bolt/exec/tests/HashTableTest.cpp
index fb4a3fc1..633f845b 100644
--- a/bolt/exec/tests/HashTableTest.cpp
+++ b/bolt/exec/tests/HashTableTest.cpp
@@ -1230,4 +1230,73 @@ TEST(HashTableTest, tableInsertPartitionInfo) {
ASSERT_EQ(overflows[i], info.overflows[i]);
}
}
+
+TEST_P(HashTableTest, reclusterDataByKey) {
+ std::vector dependentTypes = {BIGINT()};
+ std::vector> hashers;
+ hashers.emplace_back(std::make_unique(BIGINT(), 0));
+
+ HashTableReclusterConfig config;
+ // Reusing 'enableRunParallel' to test different recluster modes (kSort vs
+ // kHash).
+ config.reclusterMode = GetParam().enableRunParallel
+ ? HashTableReclusterConfig::ReclusterMode::kSort
+ : HashTableReclusterConfig::ReclusterMode::kHash;
+ config.enableArrayRecluster = true;
+ config.duplicateRatioThreshold = 0;
+ config.minDistinctRowNumber = 0;
+
+ auto hashTable = HashTable::createForJoin(
+ std::move(hashers),
+ dependentTypes,
+ true /*allowDuplicates*/,
+ false /*hasProbedFlag*/,
+ BaseHashTable::HashMode::kArray,
+ 1 /*minTableSizeForParallelJoinBuild*/,
+ pool(),
+ GetParam().jitRowEqVectors,
+ config);
+ const int32_t numRows = 4096;
+ const int32_t numDistinctKeys = 16;
+ auto keyVector = makeFlatVector(
+ numRows, [](auto row) { return (row % numDistinctKeys) + 1; });
+ auto payloadVector =
+ makeFlatVector(numRows, [](auto row) { return row * 10; });
+
+ auto batch = makeRowVector({keyVector, payloadVector});
+ copyVectorsToTable({batch}, 0, hashTable.get());
+ hashTable->prepareJoinTable({}, executor_.get());
+ auto& rows = *hashTable->rows();
+ EXPECT_EQ(rows.numRows(), numRows);
+
+ hashTable->reclusterDataByKey();
+
+ auto& newRows = *hashTable->rows();
+ EXPECT_EQ(newRows.numRows(), numRows);
+
+ int32_t keyOffset = newRows.columnAt(0).offset();
+ RowContainerIterator iter;
+ std::vector resultKeys;
+ char* rowPtrs[1];
+
+ while (newRows.listRows(&iter, 1, rowPtrs) > 0) {
+ char* row = rowPtrs[0];
+ resultKeys.push_back(newRows.valueAt(row, keyOffset));
+ }
+
+ ASSERT_EQ(resultKeys.size(), numRows);
+ std::unordered_set seenKeys(resultKeys.begin(), resultKeys.end());
+ EXPECT_EQ(seenKeys.size(), numDistinctKeys);
+ int changeCount = 0;
+ for (size_t i = 1; i < resultKeys.size(); ++i) {
+ if (resultKeys[i] != resultKeys[i - 1]) {
+ changeCount++;
+ }
+ }
+ EXPECT_EQ(changeCount, numDistinctKeys - 1)
+ << "Data should be clustered into " << numDistinctKeys
+ << " groups, so there should be exactly " << numDistinctKeys - 1
+ << " transitions.";
+}
+
} // namespace bytedance::bolt::exec::test
diff --git a/bolt/exec/tests/RowContainerTest.cpp b/bolt/exec/tests/RowContainerTest.cpp
index b8d6f55f..71630626 100644
--- a/bolt/exec/tests/RowContainerTest.cpp
+++ b/bolt/exec/tests/RowContainerTest.cpp
@@ -1747,6 +1747,68 @@ DEBUG_ONLY_TEST_F(RowContainerTest, eraseAfterOomStoringString) {
rowContainer->eraseRows(folly::Range(rows.data(), numRows));
}
+TEST_F(RowContainerTest, cloneByOrder) {
+ std::vector keyTypes = {BIGINT()};
+ std::vector dependentTypes = {VARCHAR()};
+
+ auto data = makeRowContainer(keyTypes, dependentTypes);
+ auto keyOffset = data->columnAt(0).offset();
+ auto storeString = [&](char* row, const std::string& s) {
+ auto vector = makeFlatVector({StringView(s)});
+ DecodedVector decoded(*vector);
+ data->store(decoded, 0, row, 1);
+ };
+ // Insert 3 rows: (1, "a"), (2, "b"), (3, "c")
+ // Row 1
+ auto* row1 = data->newRow();
+ data->valueAt(row1, keyOffset) = 1;
+ std::string str1 = "value_a_long_string";
+ storeString(row1, str1);
+
+ // Row 2
+ auto* row2 = data->newRow();
+ data->valueAt(row2, keyOffset) = 2;
+ std::string str2 = "value_b";
+ storeString(row2, str2);
+
+ // Row 3
+ auto* row3 = data->newRow();
+ data->valueAt(row3, keyOffset) = 3;
+ std::string str3 = "value_c";
+ storeString(row3, str3);
+
+ EXPECT_EQ(data->numRows(), 3);
+
+ // Create a vector of rows in reverse order: 3, 2, 1
+ std::vector sortedRows = {row3, row2, row1};
+
+ auto newData = data->cloneByOrder(sortedRows);
+
+ EXPECT_EQ(newData->numRows(), 3);
+ EXPECT_NE(newData.get(), data.get());
+
+ int64_t expectedKeys[] = {3, 2, 1};
+ std::string expectedVals[] = {str3, str2, str1};
+
+ std::vector clonedRows;
+ clonedRows.resize(3);
+ RowContainerIterator iter;
+ auto numRows = newData->listRows(&iter, 3, clonedRows.data());
+ EXPECT_EQ(numRows, 3);
+ int counter = 0;
+ auto newKeyOffset = newData->columnAt(0).offset();
+ auto newStringOffset = newData->columnAt(1).offset();
+
+ for (auto row : clonedRows) {
+ int64_t key = newData->valueAt(row, newKeyOffset);
+ EXPECT_EQ(key, expectedKeys[counter]);
+
+ auto val = newData->valueAt(row, newStringOffset);
+ EXPECT_EQ(std::string(val), expectedVals[counter]);
+ ++counter;
+ }
+}
+
TEST_F(RowContainerTest, DISABLED_ConvertBenchmark) {
VectorFuzzer fuzzer(
{.vectorSize = 100000, .nullRatio = 0.1, .containerLength = 10}, pool());