diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4e8c0478b2ed28..7890b76421f49a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -876,6 +876,10 @@ CONF_mInt32(spill_init_partition, "16");
// The maximum size of a single log block container file, this is not a hard limit.
// If the file size exceeds this limit, a new file will be created to store the block.
CONF_Int64(spill_max_log_block_container_bytes, "10737418240"); // 10GB
+// The maximum size of a single spill directory, for some case the spill directory may
+// be the same with storage path. Spill will return with error when used size has exceeded
+// the limit.
+CONF_Int64(spill_max_dir_bytes, "214748364800"); // 200GB
// Now, only get_info is processed by _async_thread_pool, and only needs a small number of threads.
// The default value is set as the THREAD_POOL_SIZE of RoutineLoadTaskScheduler of FE.
diff --git a/be/src/exec/spill/dir_manager.cpp b/be/src/exec/spill/dir_manager.cpp
index 9701f9c4ac7430..a7aa6576e5a3c0 100644
--- a/be/src/exec/spill/dir_manager.cpp
+++ b/be/src/exec/spill/dir_manager.cpp
@@ -59,6 +59,7 @@ Status DirManager::init() {
std::regex query_id_pattern("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$");
+ int64_t max_dir_size = config::spill_max_dir_bytes;
for (const auto& path : spill_local_storage_paths) {
std::string spill_dir_path = path.path;
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(spill_dir_path));
@@ -76,7 +77,7 @@ Status DirManager::init() {
}
return true;
}));
- _dirs.emplace_back(std::make_shared
(spill_dir_path, std::move(fs)));
+ _dirs.emplace_back(std::make_shared(spill_dir_path, std::move(fs), max_dir_size));
}
return Status::OK();
}
diff --git a/be/src/exec/spill/dir_manager.h b/be/src/exec/spill/dir_manager.h
index 0e8b9add5b7bb7..d18f1f5d33ce1d 100644
--- a/be/src/exec/spill/dir_manager.h
+++ b/be/src/exec/spill/dir_manager.h
@@ -27,14 +27,28 @@ namespace starrocks::spill {
// @TODO(silverbullet233): maintain some stats, such as the capacity
class Dir {
public:
- Dir(std::string dir, std::shared_ptr fs) : _dir(std::move(dir)), _fs(fs) {}
+ Dir(std::string dir, std::shared_ptr fs, int64_t max_dir_size):_dir(std::move(dir)), _fs(fs), _max_size(max_dir_size) {}
FileSystem* fs() const { return _fs.get(); }
std::string dir() const { return _dir; }
+ int64_t get_current_size() const {
+ return _current_size.load();
+ }
+
+ void set_current_size(int64_t value) {
+ _current_size.store(value);
+ }
+
+ int64_t get_max_size() const {
+ return _max_size;
+ }
+
private:
std::string _dir;
std::shared_ptr _fs;
+ int64_t _max_size;
+ std::atomic _current_size;
};
using DirPtr = std::shared_ptr;
diff --git a/be/src/exec/spill/log_block_manager.cpp b/be/src/exec/spill/log_block_manager.cpp
index 4f620ed2d212ee..4bf0e1620fda46 100644
--- a/be/src/exec/spill/log_block_manager.cpp
+++ b/be/src/exec/spill/log_block_manager.cpp
@@ -167,9 +167,13 @@ class LogBlock : public Block {
Status append(const std::vector& data) override {
size_t total_size = 0;
std::for_each(data.begin(), data.end(), [&](const Slice& slice) { total_size += slice.size; });
+ if (_container->dir()->get_current_size() + total_size > _container->dir()->get_max_size()) {
+ return Status::Aborted("Dir current used size has exceeded limit!");
+ }
RETURN_IF_ERROR(_container->ensure_preallocate(total_size));
RETURN_IF_ERROR(_container->append_data(data));
_size += total_size;
+ _container->dir()->set_current_size(_container->dir()->get_current_size() + total_size);
return Status::OK();
}