From add2b29a0064216e73937c8a5baaf86cee10cbdb Mon Sep 17 00:00:00 2001 From: caneGuy Date: Fri, 30 Jun 2023 12:08:22 +0800 Subject: [PATCH] [Enhancement] Support limit dir size for spill Signed-off-by: caneGuy --- be/src/common/config.h | 4 ++++ be/src/exec/spill/dir_manager.cpp | 3 ++- be/src/exec/spill/dir_manager.h | 16 +++++++++++++++- be/src/exec/spill/log_block_manager.cpp | 4 ++++ 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/be/src/common/config.h b/be/src/common/config.h index 4e8c0478b2ed28..7890b76421f49a 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -876,6 +876,10 @@ CONF_mInt32(spill_init_partition, "16"); // The maximum size of a single log block container file, this is not a hard limit. // If the file size exceeds this limit, a new file will be created to store the block. CONF_Int64(spill_max_log_block_container_bytes, "10737418240"); // 10GB +// The maximum size of a single spill directory, for some case the spill directory may +// be the same with storage path. Spill will return with error when used size has exceeded +// the limit. +CONF_Int64(spill_max_dir_bytes, "214748364800"); // 200GB // Now, only get_info is processed by _async_thread_pool, and only needs a small number of threads. // The default value is set as the THREAD_POOL_SIZE of RoutineLoadTaskScheduler of FE. diff --git a/be/src/exec/spill/dir_manager.cpp b/be/src/exec/spill/dir_manager.cpp index 9701f9c4ac7430..a7aa6576e5a3c0 100644 --- a/be/src/exec/spill/dir_manager.cpp +++ b/be/src/exec/spill/dir_manager.cpp @@ -59,6 +59,7 @@ Status DirManager::init() { std::regex query_id_pattern("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"); + int64_t max_dir_size = config::spill_max_dir_bytes; for (const auto& path : spill_local_storage_paths) { std::string spill_dir_path = path.path; ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(spill_dir_path)); @@ -76,7 +77,7 @@ Status DirManager::init() { } return true; })); - _dirs.emplace_back(std::make_shared(spill_dir_path, std::move(fs))); + _dirs.emplace_back(std::make_shared(spill_dir_path, std::move(fs), max_dir_size)); } return Status::OK(); } diff --git a/be/src/exec/spill/dir_manager.h b/be/src/exec/spill/dir_manager.h index 0e8b9add5b7bb7..d18f1f5d33ce1d 100644 --- a/be/src/exec/spill/dir_manager.h +++ b/be/src/exec/spill/dir_manager.h @@ -27,14 +27,28 @@ namespace starrocks::spill { // @TODO(silverbullet233): maintain some stats, such as the capacity class Dir { public: - Dir(std::string dir, std::shared_ptr fs) : _dir(std::move(dir)), _fs(fs) {} + Dir(std::string dir, std::shared_ptr fs, int64_t max_dir_size):_dir(std::move(dir)), _fs(fs), _max_size(max_dir_size) {} FileSystem* fs() const { return _fs.get(); } std::string dir() const { return _dir; } + int64_t get_current_size() const { + return _current_size.load(); + } + + void set_current_size(int64_t value) { + _current_size.store(value); + } + + int64_t get_max_size() const { + return _max_size; + } + private: std::string _dir; std::shared_ptr _fs; + int64_t _max_size; + std::atomic _current_size; }; using DirPtr = std::shared_ptr; diff --git a/be/src/exec/spill/log_block_manager.cpp b/be/src/exec/spill/log_block_manager.cpp index 4f620ed2d212ee..4bf0e1620fda46 100644 --- a/be/src/exec/spill/log_block_manager.cpp +++ b/be/src/exec/spill/log_block_manager.cpp @@ -167,9 +167,13 @@ class LogBlock : public Block { Status append(const std::vector& data) override { size_t total_size = 0; std::for_each(data.begin(), data.end(), [&](const Slice& slice) { total_size += slice.size; }); + if (_container->dir()->get_current_size() + total_size > _container->dir()->get_max_size()) { + return Status::Aborted("Dir current used size has exceeded limit!"); + } RETURN_IF_ERROR(_container->ensure_preallocate(total_size)); RETURN_IF_ERROR(_container->append_data(data)); _size += total_size; + _container->dir()->set_current_size(_container->dir()->get_current_size() + total_size); return Status::OK(); }