Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 65 additions & 6 deletions bolt/connectors/Connector.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,12 +378,12 @@ class AsyncThreadCtx {
}

void in() {
std::unique_lock lock(mutex_);
std::scoped_lock lock(mutex_);
numIn_++;
cv_.notify_one();
}
void out() {
std::unique_lock lock(mutex_);
std::scoped_lock lock(mutex_);
numIn_--;
cv_.notify_one();
}
Expand All @@ -401,12 +401,26 @@ class AsyncThreadCtx {
return mutex_;
}

int64_t& inPreloadingBytes() {
int64_t preloadBytesLimit() const {
return preloadBytesLimit_;
}

void addPreloadingBytes(int64_t bytes) {
std::scoped_lock lock(mutex_);
addPreloadingBytesUntracked(bytes);
}

int64_t inPreloadingBytes() const {
std::scoped_lock lock(mutex_);
return inPreloadingBytesUntracked();
}

int64_t inPreloadingBytesUntracked() const {
return inPreloadingBytes_;
}

int64_t preloadBytesLimit() const {
return preloadBytesLimit_;
void addPreloadingBytesUntracked(int64_t bytes) {
inPreloadingBytes_ += bytes;
}

void disallowPreload() {
Expand All @@ -417,11 +431,56 @@ class AsyncThreadCtx {
}

bool allowPreload() {
if (adaptive_ && allowPreload_.load()) {
std::scoped_lock lock(mutex_);
return inPreloadingBytes_ < preloadBytesLimit_;
}
return allowPreload_.load();
}

class Guard {
public:
Guard(AsyncThreadCtx* ctx, int64_t bytes = 0) : ctx_(ctx), bytes_(bytes) {
if (ctx_) {
ctx_->in();
ctx_->addPreloadingBytes(bytes_);
}
}

~Guard() {
if (ctx_) {
ctx_->out();
ctx_->addPreloadingBytes(-bytes_);
}
}

Guard(const Guard&) = delete;
Guard& operator=(const Guard&) = delete;

Guard(Guard&& other) noexcept : ctx_(other.ctx_), bytes_(other.bytes_) {
other.ctx_ = nullptr;
}

Guard& operator=(Guard&& other) noexcept {
if (this != &other) {
if (ctx_) {
ctx_->out();
ctx_->addPreloadingBytes(-bytes_);
}
ctx_ = other.ctx_;
bytes_ = other.bytes_;
other.ctx_ = nullptr;
}
return *this;
}

private:
AsyncThreadCtx* ctx_;
int64_t bytes_;
};

private:
std::mutex mutex_;
mutable std::mutex mutex_;
int numIn_{0};
int64_t inPreloadingBytes_{0};
int64_t preloadBytesLimit_{0};
Expand Down
4 changes: 0 additions & 4 deletions bolt/dwio/common/DirectBufferedInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,6 @@ void DirectBufferedInput::readRegions(
if (asyncLoad.load->state() != DirectCoalescedLoad::State::kPlanned) {
return;
}
// the load is valid, so asyncThreadCtx is not freed yet.
auto guard =
folly::makeGuard([&]() { asyncLoad.asyncThreadCtx->out(); });
asyncLoad.asyncThreadCtx->in(); // trace in-flight loading
// first check available memory allows to preload data, even if not,
// the non-preload load will be sync loaded on the main thread.
if (asyncLoad.canPreload()) {
Expand Down
38 changes: 30 additions & 8 deletions bolt/dwio/common/DirectBufferedInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,7 @@ class DirectBufferedInput : public BufferedInput {

void updatePreloadingBytes(int64_t bytes) {
if (asyncThreadCtx_) {
std::lock_guard<std::mutex> lock(asyncThreadCtx_->getMutex());
asyncThreadCtx_->inPreloadingBytes() += bytes;
asyncThreadCtx_->addPreloadingBytes(bytes);
}
}

Expand Down Expand Up @@ -301,11 +300,26 @@ class DirectBufferedInput : public BufferedInput {
connector::AsyncThreadCtx* asyncThreadCtx)
: load(std::move(load)),
prefetchMemoryPercent_(prefetchMemoryPercent),
asyncThreadCtx(asyncThreadCtx) {
asyncThreadCtx(asyncThreadCtx),
inGuard_(asyncThreadCtx) {
BOLT_CHECK(asyncThreadCtx);
preloadBytesLimit_ = asyncThreadCtx->preloadBytesLimit();
}

AsyncLoadHolder(const AsyncLoadHolder&) = delete;
AsyncLoadHolder& operator=(const AsyncLoadHolder&) = delete;

AsyncLoadHolder(AsyncLoadHolder&& other) noexcept
: load(std::move(other.load)),
prefetchMemoryPercent_(other.prefetchMemoryPercent_),
asyncThreadCtx(other.asyncThreadCtx),
preloadBytesLimit_(other.preloadBytesLimit_),
inGuard_(std::move(other.inGuard_)),
addedBytes_(other.addedBytes_) {
other.asyncThreadCtx = nullptr;
other.addedBytes_ = 0;
}

bool canPreload() const {
static int maxAttempt = 1000;
static int sleepMs = 500;
Expand All @@ -322,13 +336,15 @@ class DirectBufferedInput : public BufferedInput {
// there are in preloading with high memory usage, sleep to avoid OOM
{
std::lock_guard<std::mutex> lock(asyncThreadCtx->getMutex());
auto& inPreloadingBytes = asyncThreadCtx->inPreloadingBytes();
// must preserve memory for other part of scan, i.e decompressed data
if ((inPreloadingBytes + load->preloadBytes() <
if ((asyncThreadCtx->inPreloadingBytesUntracked() +
load->preloadBytes() <
preloadBytesLimit_ * prefetchMemoryPercent_ / 100.0 &&
inPreloadingBytes + load->preloadBytes() + memoryBytes <
asyncThreadCtx->inPreloadingBytesUntracked() +
load->preloadBytes() + memoryBytes <
preloadBytesLimit_ / 2)) {
inPreloadingBytes += load->preloadBytes();
asyncThreadCtx->addPreloadingBytesUntracked(load->preloadBytes());
addedBytes_ = load->preloadBytes();
return true;
}
}
Expand All @@ -344,7 +360,7 @@ class DirectBufferedInput : public BufferedInput {
<< " s, pool_.currentBytes(): " << memoryBytes
<< " preloadBytesLimit: " << preloadBytesLimit_
<< " inPreloadingBytes: "
<< asyncThreadCtx->inPreloadingBytes()
<< asyncThreadCtx->inPreloadingBytesUntracked()
<< " preloadBytes: " << load->preloadBytes()
<< " prefetchMemoryPercent: " << prefetchMemoryPercent_
<< ", pls reduce preload IO threads or add memory";
Expand All @@ -360,8 +376,14 @@ class DirectBufferedInput : public BufferedInput {
int32_t prefetchMemoryPercent_{30};
connector::AsyncThreadCtx* asyncThreadCtx;
uint64_t preloadBytesLimit_{0};
connector::AsyncThreadCtx::Guard inGuard_;

mutable int64_t addedBytes_{0};

~AsyncLoadHolder() {
if (asyncThreadCtx && addedBytes_ > 0) {
asyncThreadCtx->addPreloadingBytes(-addedBytes_);
}
// Release the load reference before the memory pool reference.
// This is to make sure the memory pool is not destroyed before we free
// up the allocated buffers. This is to handle the case that the
Expand Down
23 changes: 16 additions & 7 deletions bolt/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* --------------------------------------------------------------------------
*/

#include <folly/ScopeGuard.h>
#include <glog/logging.h>
#include <cstdint>
#include <memory>
Expand Down Expand Up @@ -78,9 +79,9 @@ TableScan::TableScan(
driverCtx_->queryConfig().tableScanGetOutputTimeLimitMs()),
enableEstimateBytesPerRow_(
driverCtx_->queryConfig().iskEstimateRowSizeBasedOnSampleEnabled()),
asyncThreadCtx_(
asyncThreadCtx_(std::make_shared<connector::AsyncThreadCtx>(
driverCtx_->queryConfig().preloadBytesLimit(),
driverCtx_->queryConfig().adaptivePreloadEnabled()) {
driverCtx_->queryConfig().adaptivePreloadEnabled())) {
for (const auto& type : asRowType(outputType_)->children()) {
if (!type->isFixedWidth()) {
isFixedWidthOutputType_ = false;
Expand Down Expand Up @@ -290,7 +291,7 @@ RowVectorPtr TableScan::getOutput() {
planNodeId(),
connectorPool_,
nullptr,
&asyncThreadCtx_);
asyncThreadCtx_.get());
dataSource_ = connector_->createDataSource(
outputType_,
tableHandle_,
Expand Down Expand Up @@ -462,7 +463,8 @@ void TableScan::preload(std::shared_ptr<connector::ConnectorSplit> split) {
planNodeId(),
connectorPool_,
nullptr,
&asyncThreadCtx_),
asyncThreadCtx_.get()),
asyncThreadCtx = asyncThreadCtx_,
task = operatorCtx_->task(),
pendingDynamicFilters = pendingDynamicFilters_,
split]() -> std::unique_ptr<connector::DataSource> {
Expand Down Expand Up @@ -493,7 +495,7 @@ void TableScan::preload(std::shared_ptr<connector::ConnectorSplit> split) {
void TableScan::checkPreload() {
auto executor = connector_->executor();
if (maxSplitPreloadPerDriver_ == 0 || !executor ||
!connector_->supportsSplitPreload() || !asyncThreadCtx_.allowPreload()) {
!connector_->supportsSplitPreload() || !asyncThreadCtx_->allowPreload()) {
return;
}
if (dataSource_->allPrefetchIssued()) {
Expand All @@ -504,7 +506,14 @@ void TableScan::checkPreload() {
[executor, this](std::shared_ptr<connector::ConnectorSplit> split) {
preload(split);

executor->add([connectorSplit = split]() mutable {
auto hiveSplit = std::dynamic_pointer_cast<
const connector::hive::HiveConnectorSplit>(split);
int64_t preloadBytes = hiveSplit ? hiveSplit->length : 0;
connector::AsyncThreadCtx::Guard guard(
asyncThreadCtx_.get(), preloadBytes);
executor->add([connectorSplit = split,
ctx = asyncThreadCtx_,
inGuard = std::move(guard)]() mutable {
connectorSplit->dataSource->prepare();
connectorSplit.reset();
});
Expand Down Expand Up @@ -576,7 +585,7 @@ void TableScan::close() {
uint64_t waitMs;
{
MicrosecondTimer timer(&waitMs);
asyncThreadCtx_.wait();
asyncThreadCtx_->wait();
}

LOG_IF(INFO, waitMs > 60000)
Expand Down
2 changes: 1 addition & 1 deletion bolt/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,6 @@ class TableScan : public SourceOperator {

uint64_t outputRows_{0};

connector::AsyncThreadCtx asyncThreadCtx_;
std::shared_ptr<connector::AsyncThreadCtx> asyncThreadCtx_;
};
} // namespace bytedance::bolt::exec
22 changes: 17 additions & 5 deletions bolt/exec/tests/TableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1703,7 +1703,23 @@ TEST_F(TableScanTest, preloadingSplitClose) {
});
}
ASSERT_EQ(Task::numCreatedTasks(), Task::numDeletedTasks());
auto task = assertQuery(tableScanNode(), filePaths, "SELECT * FROM tmp", 2);
std::shared_ptr<Task> task;
{
// Unblock the IO thread pool after a short delay to allow the task to finish.
// This is necessary because TableScan::close() waits for pending preloads,
// and if the IO threads are blocked, the preloads will never complete.
std::thread unblocker([&batons]() {
/* sleep override */
std::this_thread::sleep_for(std::chrono::milliseconds(200));
for (auto& baton : batons) {
baton.post();
}
});

task = assertQuery(tableScanNode(), filePaths, "SELECT * FROM tmp", 2);
unblocker.join();
}

auto stats = getTableScanRuntimeStats(task);

// Verify that split preloading is enabled.
Expand All @@ -1713,10 +1729,6 @@ TEST_F(TableScanTest, preloadingSplitClose) {
// Once all task references are cleared, the count of deleted tasks should
// promptly match the count of created tasks.
ASSERT_EQ(Task::numCreatedTasks(), Task::numDeletedTasks());
// Clean blocking items in the IO thread pool.
for (auto& baton : batons) {
baton.post();
}
latch.wait();
}

Expand Down