diff --git a/bolt/connectors/Connector.h b/bolt/connectors/Connector.h index 510628df..0db13ea7 100644 --- a/bolt/connectors/Connector.h +++ b/bolt/connectors/Connector.h @@ -378,12 +378,12 @@ class AsyncThreadCtx { } void in() { - std::unique_lock lock(mutex_); + std::scoped_lock lock(mutex_); numIn_++; cv_.notify_one(); } void out() { - std::unique_lock lock(mutex_); + std::scoped_lock lock(mutex_); numIn_--; cv_.notify_one(); } @@ -401,12 +401,26 @@ class AsyncThreadCtx { return mutex_; } - int64_t& inPreloadingBytes() { + int64_t preloadBytesLimit() const { + return preloadBytesLimit_; + } + + void addPreloadingBytes(int64_t bytes) { + std::scoped_lock lock(mutex_); + addPreloadingBytesUntracked(bytes); + } + + int64_t inPreloadingBytes() const { + std::scoped_lock lock(mutex_); + return inPreloadingBytesUntracked(); + } + + int64_t inPreloadingBytesUntracked() const { return inPreloadingBytes_; } - int64_t preloadBytesLimit() const { - return preloadBytesLimit_; + void addPreloadingBytesUntracked(int64_t bytes) { + inPreloadingBytes_ += bytes; } void disallowPreload() { @@ -417,11 +431,56 @@ class AsyncThreadCtx { } bool allowPreload() { + if (adaptive_ && allowPreload_.load()) { + std::scoped_lock lock(mutex_); + return inPreloadingBytes_ < preloadBytesLimit_; + } return allowPreload_.load(); } + class Guard { + public: + Guard(AsyncThreadCtx* ctx, int64_t bytes = 0) : ctx_(ctx), bytes_(bytes) { + if (ctx_) { + ctx_->in(); + ctx_->addPreloadingBytes(bytes_); + } + } + + ~Guard() { + if (ctx_) { + ctx_->out(); + ctx_->addPreloadingBytes(-bytes_); + } + } + + Guard(const Guard&) = delete; + Guard& operator=(const Guard&) = delete; + + Guard(Guard&& other) noexcept : ctx_(other.ctx_), bytes_(other.bytes_) { + other.ctx_ = nullptr; + } + + Guard& operator=(Guard&& other) noexcept { + if (this != &other) { + if (ctx_) { + ctx_->out(); + ctx_->addPreloadingBytes(-bytes_); + } + ctx_ = other.ctx_; + bytes_ = other.bytes_; + other.ctx_ = nullptr; + } + return *this; + } + + private: + AsyncThreadCtx* ctx_; + int64_t bytes_; + }; + private: - std::mutex mutex_; + mutable std::mutex mutex_; int numIn_{0}; int64_t inPreloadingBytes_{0}; int64_t preloadBytesLimit_{0}; diff --git a/bolt/dwio/common/DirectBufferedInput.cpp b/bolt/dwio/common/DirectBufferedInput.cpp index c23e34e3..de6bfe03 100644 --- a/bolt/dwio/common/DirectBufferedInput.cpp +++ b/bolt/dwio/common/DirectBufferedInput.cpp @@ -246,10 +246,6 @@ void DirectBufferedInput::readRegions( if (asyncLoad.load->state() != DirectCoalescedLoad::State::kPlanned) { return; } - // the load is valid, so asyncThreadCtx is not freed yet. - auto guard = - folly::makeGuard([&]() { asyncLoad.asyncThreadCtx->out(); }); - asyncLoad.asyncThreadCtx->in(); // trace in-flight loading // first check available memory allows to preload data, even if not, // the non-preload load will be sync loaded on the main thread. if (asyncLoad.canPreload()) { diff --git a/bolt/dwio/common/DirectBufferedInput.h b/bolt/dwio/common/DirectBufferedInput.h index e0b51171..8381c41f 100644 --- a/bolt/dwio/common/DirectBufferedInput.h +++ b/bolt/dwio/common/DirectBufferedInput.h @@ -200,8 +200,7 @@ class DirectBufferedInput : public BufferedInput { void updatePreloadingBytes(int64_t bytes) { if (asyncThreadCtx_) { - std::lock_guard lock(asyncThreadCtx_->getMutex()); - asyncThreadCtx_->inPreloadingBytes() += bytes; + asyncThreadCtx_->addPreloadingBytes(bytes); } } @@ -301,11 +300,26 @@ class DirectBufferedInput : public BufferedInput { connector::AsyncThreadCtx* asyncThreadCtx) : load(std::move(load)), prefetchMemoryPercent_(prefetchMemoryPercent), - asyncThreadCtx(asyncThreadCtx) { + asyncThreadCtx(asyncThreadCtx), + inGuard_(asyncThreadCtx) { BOLT_CHECK(asyncThreadCtx); preloadBytesLimit_ = asyncThreadCtx->preloadBytesLimit(); } + AsyncLoadHolder(const AsyncLoadHolder&) = delete; + AsyncLoadHolder& operator=(const AsyncLoadHolder&) = delete; + + AsyncLoadHolder(AsyncLoadHolder&& other) noexcept + : load(std::move(other.load)), + prefetchMemoryPercent_(other.prefetchMemoryPercent_), + asyncThreadCtx(other.asyncThreadCtx), + preloadBytesLimit_(other.preloadBytesLimit_), + inGuard_(std::move(other.inGuard_)), + addedBytes_(other.addedBytes_) { + other.asyncThreadCtx = nullptr; + other.addedBytes_ = 0; + } + bool canPreload() const { static int maxAttempt = 1000; static int sleepMs = 500; @@ -322,13 +336,15 @@ class DirectBufferedInput : public BufferedInput { // there are in preloading with high memory usage, sleep to avoid OOM { std::lock_guard lock(asyncThreadCtx->getMutex()); - auto& inPreloadingBytes = asyncThreadCtx->inPreloadingBytes(); // must preserve memory for other part of scan, i.e decompressed data - if ((inPreloadingBytes + load->preloadBytes() < + if ((asyncThreadCtx->inPreloadingBytesUntracked() + + load->preloadBytes() < preloadBytesLimit_ * prefetchMemoryPercent_ / 100.0 && - inPreloadingBytes + load->preloadBytes() + memoryBytes < + asyncThreadCtx->inPreloadingBytesUntracked() + + load->preloadBytes() + memoryBytes < preloadBytesLimit_ / 2)) { - inPreloadingBytes += load->preloadBytes(); + asyncThreadCtx->addPreloadingBytesUntracked(load->preloadBytes()); + addedBytes_ = load->preloadBytes(); return true; } } @@ -344,7 +360,7 @@ class DirectBufferedInput : public BufferedInput { << " s, pool_.currentBytes(): " << memoryBytes << " preloadBytesLimit: " << preloadBytesLimit_ << " inPreloadingBytes: " - << asyncThreadCtx->inPreloadingBytes() + << asyncThreadCtx->inPreloadingBytesUntracked() << " preloadBytes: " << load->preloadBytes() << " prefetchMemoryPercent: " << prefetchMemoryPercent_ << ", pls reduce preload IO threads or add memory"; @@ -360,8 +376,14 @@ class DirectBufferedInput : public BufferedInput { int32_t prefetchMemoryPercent_{30}; connector::AsyncThreadCtx* asyncThreadCtx; uint64_t preloadBytesLimit_{0}; + connector::AsyncThreadCtx::Guard inGuard_; + + mutable int64_t addedBytes_{0}; ~AsyncLoadHolder() { + if (asyncThreadCtx && addedBytes_ > 0) { + asyncThreadCtx->addPreloadingBytes(-addedBytes_); + } // Release the load reference before the memory pool reference. // This is to make sure the memory pool is not destroyed before we free // up the allocated buffers. This is to handle the case that the diff --git a/bolt/exec/TableScan.cpp b/bolt/exec/TableScan.cpp index cc1ac3d8..88ab2713 100644 --- a/bolt/exec/TableScan.cpp +++ b/bolt/exec/TableScan.cpp @@ -28,6 +28,7 @@ * -------------------------------------------------------------------------- */ +#include #include #include #include @@ -78,9 +79,9 @@ TableScan::TableScan( driverCtx_->queryConfig().tableScanGetOutputTimeLimitMs()), enableEstimateBytesPerRow_( driverCtx_->queryConfig().iskEstimateRowSizeBasedOnSampleEnabled()), - asyncThreadCtx_( + asyncThreadCtx_(std::make_shared( driverCtx_->queryConfig().preloadBytesLimit(), - driverCtx_->queryConfig().adaptivePreloadEnabled()) { + driverCtx_->queryConfig().adaptivePreloadEnabled())) { for (const auto& type : asRowType(outputType_)->children()) { if (!type->isFixedWidth()) { isFixedWidthOutputType_ = false; @@ -290,7 +291,7 @@ RowVectorPtr TableScan::getOutput() { planNodeId(), connectorPool_, nullptr, - &asyncThreadCtx_); + asyncThreadCtx_.get()); dataSource_ = connector_->createDataSource( outputType_, tableHandle_, @@ -462,7 +463,8 @@ void TableScan::preload(std::shared_ptr split) { planNodeId(), connectorPool_, nullptr, - &asyncThreadCtx_), + asyncThreadCtx_.get()), + asyncThreadCtx = asyncThreadCtx_, task = operatorCtx_->task(), pendingDynamicFilters = pendingDynamicFilters_, split]() -> std::unique_ptr { @@ -493,7 +495,7 @@ void TableScan::preload(std::shared_ptr split) { void TableScan::checkPreload() { auto executor = connector_->executor(); if (maxSplitPreloadPerDriver_ == 0 || !executor || - !connector_->supportsSplitPreload() || !asyncThreadCtx_.allowPreload()) { + !connector_->supportsSplitPreload() || !asyncThreadCtx_->allowPreload()) { return; } if (dataSource_->allPrefetchIssued()) { @@ -504,7 +506,14 @@ void TableScan::checkPreload() { [executor, this](std::shared_ptr split) { preload(split); - executor->add([connectorSplit = split]() mutable { + auto hiveSplit = std::dynamic_pointer_cast< + const connector::hive::HiveConnectorSplit>(split); + int64_t preloadBytes = hiveSplit ? hiveSplit->length : 0; + connector::AsyncThreadCtx::Guard guard( + asyncThreadCtx_.get(), preloadBytes); + executor->add([connectorSplit = split, + ctx = asyncThreadCtx_, + inGuard = std::move(guard)]() mutable { connectorSplit->dataSource->prepare(); connectorSplit.reset(); }); @@ -576,7 +585,7 @@ void TableScan::close() { uint64_t waitMs; { MicrosecondTimer timer(&waitMs); - asyncThreadCtx_.wait(); + asyncThreadCtx_->wait(); } LOG_IF(INFO, waitMs > 60000) diff --git a/bolt/exec/TableScan.h b/bolt/exec/TableScan.h index 351870bb..3089e0a3 100644 --- a/bolt/exec/TableScan.h +++ b/bolt/exec/TableScan.h @@ -175,6 +175,6 @@ class TableScan : public SourceOperator { uint64_t outputRows_{0}; - connector::AsyncThreadCtx asyncThreadCtx_; + std::shared_ptr asyncThreadCtx_; }; } // namespace bytedance::bolt::exec diff --git a/bolt/exec/tests/TableScanTest.cpp b/bolt/exec/tests/TableScanTest.cpp index d20f30f7..e78ed298 100644 --- a/bolt/exec/tests/TableScanTest.cpp +++ b/bolt/exec/tests/TableScanTest.cpp @@ -1703,7 +1703,23 @@ TEST_F(TableScanTest, preloadingSplitClose) { }); } ASSERT_EQ(Task::numCreatedTasks(), Task::numDeletedTasks()); - auto task = assertQuery(tableScanNode(), filePaths, "SELECT * FROM tmp", 2); + std::shared_ptr task; + { + // Unblock the IO thread pool after a short delay to allow the task to finish. + // This is necessary because TableScan::close() waits for pending preloads, + // and if the IO threads are blocked, the preloads will never complete. + std::thread unblocker([&batons]() { + /* sleep override */ + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + for (auto& baton : batons) { + baton.post(); + } + }); + + task = assertQuery(tableScanNode(), filePaths, "SELECT * FROM tmp", 2); + unblocker.join(); + } + auto stats = getTableScanRuntimeStats(task); // Verify that split preloading is enabled. @@ -1713,10 +1729,6 @@ TEST_F(TableScanTest, preloadingSplitClose) { // Once all task references are cleared, the count of deleted tasks should // promptly match the count of created tasks. ASSERT_EQ(Task::numCreatedTasks(), Task::numDeletedTasks()); - // Clean blocking items in the IO thread pool. - for (auto& baton : batons) { - baton.post(); - } latch.wait(); }