diff --git a/be/src/common/config.h b/be/src/common/config.h
index afca9e29be2e3..9e4e75b6446aa 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -908,6 +908,10 @@ CONF_mInt32(spill_init_partition, "16");
// The maximum size of a single log block container file, this is not a hard limit.
// If the file size exceeds this limit, a new file will be created to store the block.
CONF_Int64(spill_max_log_block_container_bytes, "10737418240"); // 10GB
+// The maximum size of a single spill directory, for some case the spill directory may
+// be the same with storage path. Spill will return with error when used size has exceeded
+// the limit.
+CONF_mDouble(spill_max_dir_bytes_ratio, "0.8"); // 80%
CONF_Int32(internal_service_query_rpc_thread_num, "-1");
diff --git a/be/src/exec/spill/dir_manager.cpp b/be/src/exec/spill/dir_manager.cpp
index 94eeece5f211c..b562bc241b581 100644
--- a/be/src/exec/spill/dir_manager.cpp
+++ b/be/src/exec/spill/dir_manager.cpp
@@ -52,6 +52,14 @@ Status DirManager::init(const std::string& spill_dirs) {
} else {
iter++;
}
+ auto iter_tmp = iter;
+ for (; iter_tmp != spill_local_storage_paths.end(); ++iter_tmp) {
+ if (is_same_disk(path, iter_tmp->path)) {
+ return Status::InvalidArgument(fmt::format(
+ "spill_local_storage_dir {} has the same disk path with {}, please use another path", path,
+ iter_tmp->path));
+ }
+ }
}
if (spill_local_storage_paths.empty()) {
return Status::InvalidArgument("cannot find available spill_local_storage_dir");
@@ -59,6 +67,7 @@ Status DirManager::init(const std::string& spill_dirs) {
std::regex query_id_pattern("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$");
+ double max_dir_size_ratio = config::spill_max_dir_bytes_ratio;
for (const auto& path : spill_local_storage_paths) {
std::string spill_dir_path = path.path;
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(spill_dir_path));
@@ -76,7 +85,17 @@ Status DirManager::init(const std::string& spill_dirs) {
}
return true;
}));
- _dirs.emplace_back(std::make_shared
(spill_dir_path, std::move(fs)));
+ ASSIGN_OR_RETURN(auto space_info, FileSystem::Default()->space(spill_dir_path));
+ int64_t max_dir_size = space_info.capacity;
+ for (const auto& storage_path : storage_paths) {
+ if (is_same_disk(spill_dir_path, storage_path)) {
+ max_dir_size = max_dir_size * max_dir_size_ratio;
+ // do not consider multi spill directory on the same disk
+ // we will return error state during initialize
+ break;
+ }
+ }
+ _dirs.emplace_back(std::make_shared(spill_dir_path, std::move(fs), max_dir_size));
}
return Status::OK();
}
diff --git a/be/src/exec/spill/dir_manager.h b/be/src/exec/spill/dir_manager.h
index 286ceaa4ff162..8cfc29db4042f 100644
--- a/be/src/exec/spill/dir_manager.h
+++ b/be/src/exec/spill/dir_manager.h
@@ -14,6 +14,8 @@
#pragma once
+#include
+
#include
#include
@@ -27,14 +29,23 @@ namespace starrocks::spill {
// @TODO(silverbullet233): maintain some stats, such as the capacity
class Dir {
public:
- Dir(std::string dir, std::shared_ptr fs) : _dir(std::move(dir)), _fs(fs) {}
+ Dir(std::string dir, std::shared_ptr fs, int64_t max_dir_size)
+ : _dir(std::move(dir)), _fs(fs), _max_size(max_dir_size) {}
FileSystem* fs() const { return _fs.get(); }
std::string dir() const { return _dir; }
+ int64_t get_current_size() const { return _current_size.load(); }
+
+ void set_current_size(int64_t value) { _current_size.store(value); }
+
+ int64_t get_max_size() const { return _max_size; }
+
private:
std::string _dir;
std::shared_ptr _fs;
+ int64_t _max_size;
+ std::atomic _current_size;
};
using DirPtr = std::shared_ptr;
@@ -55,6 +66,13 @@ class DirManager {
StatusOr acquire_writable_dir(const AcquireDirOptions& opts);
private:
+ bool is_same_disk(const std::string& path1, const std::string& path2) {
+ struct statfs stat1, stat2;
+ statfs(path1.c_str(), &stat1);
+ statfs(path2.c_str(), &stat2);
+ return stat1.f_fsid.__val[0] == stat2.f_fsid.__val[0] && stat1.f_fsid.__val[1] == stat2.f_fsid.__val[1];
+ }
+
std::atomic _idx = 0;
std::vector _dirs;
};
diff --git a/be/src/exec/spill/log_block_manager.cpp b/be/src/exec/spill/log_block_manager.cpp
index 4f620ed2d212e..2d4dca22dc86c 100644
--- a/be/src/exec/spill/log_block_manager.cpp
+++ b/be/src/exec/spill/log_block_manager.cpp
@@ -167,9 +167,15 @@ class LogBlock : public Block {
Status append(const std::vector& data) override {
size_t total_size = 0;
std::for_each(data.begin(), data.end(), [&](const Slice& slice) { total_size += slice.size; });
+ if (_container->dir()->get_current_size() + total_size > _container->dir()->get_max_size()) {
+ return Status::Aborted(
+ fmt::format("Dir current used size has exceeded limit {}! Current size {}, total_size {}!",
+ _container->dir()->get_max_size(), _container->dir()->get_current_size(), total_size));
+ }
RETURN_IF_ERROR(_container->ensure_preallocate(total_size));
RETURN_IF_ERROR(_container->append_data(data));
_size += total_size;
+ _container->dir()->set_current_size(_container->dir()->get_current_size() + total_size);
return Status::OK();
}