diff --git a/be/src/common/config.h b/be/src/common/config.h index 12baa4aa4d2f2..b4fa3159132b4 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -960,6 +960,10 @@ CONF_Int32(spill_max_partition_size, "1024"); // The maximum size of a single log block container file, this is not a hard limit. // If the file size exceeds this limit, a new file will be created to store the block. CONF_Int64(spill_max_log_block_container_bytes, "10737418240"); // 10GB +// The maximum size of a single spill directory, for some case the spill directory may +// be the same with storage path. Spill will return with error when used size has exceeded +// the limit. +CONF_mDouble(spill_max_dir_bytes_ratio, "0.8"); // 80% CONF_Int32(internal_service_query_rpc_thread_num, "-1"); diff --git a/be/src/exec/spill/dir_manager.cpp b/be/src/exec/spill/dir_manager.cpp index 9701f9c4ac743..9f623cb8a1d61 100644 --- a/be/src/exec/spill/dir_manager.cpp +++ b/be/src/exec/spill/dir_manager.cpp @@ -52,6 +52,14 @@ Status DirManager::init() { } else { iter++; } + auto iter_tmp = iter; + for (; iter_tmp != spill_local_storage_paths.end(); ++iter_tmp) { + if (is_same_disk(path, iter_tmp->path)) { + return Status::InvalidArgument(fmt::format( + "spill_local_storage_dir {} has the same disk path with {}, please use another path", path, + iter_tmp->path)); + } + } } if (spill_local_storage_paths.empty()) { return Status::InvalidArgument("cannot find available spill_local_storage_dir"); @@ -59,6 +67,7 @@ Status DirManager::init() { std::regex query_id_pattern("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"); + double max_dir_size_ratio = config::spill_max_dir_bytes_ratio; for (const auto& path : spill_local_storage_paths) { std::string spill_dir_path = path.path; ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(spill_dir_path)); @@ -76,7 +85,17 @@ Status DirManager::init() { } return true; })); - _dirs.emplace_back(std::make_shared(spill_dir_path, std::move(fs))); + ASSIGN_OR_RETURN(auto space_info, FileSystem::Default()->space(spill_dir_path)); + int64_t max_dir_size = space_info.capacity; + for (const auto& storage_path : storage_paths) { + if (is_same_disk(spill_dir_path, storage_path)) { + max_dir_size = max_dir_size * max_dir_size_ratio; + // do not consider multi spill directory on the same disk + // we will return error state during initialize + break; + } + } + _dirs.emplace_back(std::make_shared(spill_dir_path, std::move(fs), max_dir_size)); } return Status::OK(); } diff --git a/be/src/exec/spill/dir_manager.h b/be/src/exec/spill/dir_manager.h index 0e8b9add5b7bb..25bbec5cb7c90 100644 --- a/be/src/exec/spill/dir_manager.h +++ b/be/src/exec/spill/dir_manager.h @@ -14,6 +14,8 @@ #pragma once +#include + #include #include @@ -27,14 +29,23 @@ namespace starrocks::spill { // @TODO(silverbullet233): maintain some stats, such as the capacity class Dir { public: - Dir(std::string dir, std::shared_ptr fs) : _dir(std::move(dir)), _fs(fs) {} + Dir(std::string dir, std::shared_ptr fs, int64_t max_dir_size) + : _dir(std::move(dir)), _fs(fs), _max_size(max_dir_size) {} FileSystem* fs() const { return _fs.get(); } std::string dir() const { return _dir; } + int64_t get_current_size() const { return _current_size.load(); } + + void set_current_size(int64_t value) { _current_size.store(value); } + + int64_t get_max_size() const { return _max_size; } + private: std::string _dir; std::shared_ptr _fs; + int64_t _max_size; + std::atomic _current_size; }; using DirPtr = std::shared_ptr; @@ -55,6 +66,13 @@ class DirManager { StatusOr acquire_writable_dir(const AcquireDirOptions& opts); private: + bool is_same_disk(const std::string& path1, const std::string& path2) { + struct statfs stat1, stat2; + statfs(path1.c_str(), &stat1); + statfs(path2.c_str(), &stat2); + return stat1.f_fsid.__val[0] == stat2.f_fsid.__val[0] && stat1.f_fsid.__val[1] == stat2.f_fsid.__val[1]; + } + std::atomic _idx = 0; std::vector _dirs; }; diff --git a/be/src/exec/spill/log_block_manager.cpp b/be/src/exec/spill/log_block_manager.cpp index 4f620ed2d212e..2d4dca22dc86c 100644 --- a/be/src/exec/spill/log_block_manager.cpp +++ b/be/src/exec/spill/log_block_manager.cpp @@ -167,9 +167,15 @@ class LogBlock : public Block { Status append(const std::vector& data) override { size_t total_size = 0; std::for_each(data.begin(), data.end(), [&](const Slice& slice) { total_size += slice.size; }); + if (_container->dir()->get_current_size() + total_size > _container->dir()->get_max_size()) { + return Status::Aborted( + fmt::format("Dir current used size has exceeded limit {}! Current size {}, total_size {}!", + _container->dir()->get_max_size(), _container->dir()->get_current_size(), total_size)); + } RETURN_IF_ERROR(_container->ensure_preallocate(total_size)); RETURN_IF_ERROR(_container->append_data(data)); _size += total_size; + _container->dir()->set_current_size(_container->dir()->get_current_size() + total_size); return Status::OK(); }