Skip to content

Commit

Permalink
[Enhancement] Support limit dir size for spill (#26787)
Browse files Browse the repository at this point in the history
Signed-off-by: caneGuy <[email protected]>
  • Loading branch information
caneGuy authored Aug 24, 2023
1 parent fbfb7ed commit d752509
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 2 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,10 @@ CONF_mInt32(spill_init_partition, "16");
// The maximum size of a single log block container file, this is not a hard limit.
// If the file size exceeds this limit, a new file will be created to store the block.
CONF_Int64(spill_max_log_block_container_bytes, "10737418240"); // 10GB
// The maximum size of a single spill directory, for some case the spill directory may
// be the same with storage path. Spill will return with error when used size has exceeded
// the limit.
CONF_mDouble(spill_max_dir_bytes_ratio, "0.8"); // 80%

CONF_Int32(internal_service_query_rpc_thread_num, "-1");

Expand Down
21 changes: 20 additions & 1 deletion be/src/exec/spill/dir_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,22 @@ Status DirManager::init(const std::string& spill_dirs) {
} else {
iter++;
}
auto iter_tmp = iter;
for (; iter_tmp != spill_local_storage_paths.end(); ++iter_tmp) {
if (is_same_disk(path, iter_tmp->path)) {
return Status::InvalidArgument(fmt::format(
"spill_local_storage_dir {} has the same disk path with {}, please use another path", path,
iter_tmp->path));
}
}
}
if (spill_local_storage_paths.empty()) {
return Status::InvalidArgument("cannot find available spill_local_storage_dir");
}

std::regex query_id_pattern("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$");

double max_dir_size_ratio = config::spill_max_dir_bytes_ratio;
for (const auto& path : spill_local_storage_paths) {
std::string spill_dir_path = path.path;
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(spill_dir_path));
Expand All @@ -76,7 +85,17 @@ Status DirManager::init(const std::string& spill_dirs) {
}
return true;
}));
_dirs.emplace_back(std::make_shared<Dir>(spill_dir_path, std::move(fs)));
ASSIGN_OR_RETURN(auto space_info, FileSystem::Default()->space(spill_dir_path));
int64_t max_dir_size = space_info.capacity;
for (const auto& storage_path : storage_paths) {
if (is_same_disk(spill_dir_path, storage_path)) {
max_dir_size = max_dir_size * max_dir_size_ratio;
// do not consider multi spill directory on the same disk
// we will return error state during initialize
break;
}
}
_dirs.emplace_back(std::make_shared<Dir>(spill_dir_path, std::move(fs), max_dir_size));
}
return Status::OK();
}
Expand Down
20 changes: 19 additions & 1 deletion be/src/exec/spill/dir_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <sys/statfs.h>

#include <atomic>
#include <memory>

Expand All @@ -27,14 +29,23 @@ namespace starrocks::spill {
// @TODO(silverbullet233): maintain some stats, such as the capacity
class Dir {
public:
Dir(std::string dir, std::shared_ptr<FileSystem> fs) : _dir(std::move(dir)), _fs(fs) {}
Dir(std::string dir, std::shared_ptr<FileSystem> fs, int64_t max_dir_size)
: _dir(std::move(dir)), _fs(fs), _max_size(max_dir_size) {}

FileSystem* fs() const { return _fs.get(); }
std::string dir() const { return _dir; }

int64_t get_current_size() const { return _current_size.load(); }

void set_current_size(int64_t value) { _current_size.store(value); }

int64_t get_max_size() const { return _max_size; }

private:
std::string _dir;
std::shared_ptr<FileSystem> _fs;
int64_t _max_size;
std::atomic<int64_t> _current_size;
};
using DirPtr = std::shared_ptr<Dir>;

Expand All @@ -55,6 +66,13 @@ class DirManager {
StatusOr<Dir*> acquire_writable_dir(const AcquireDirOptions& opts);

private:
bool is_same_disk(const std::string& path1, const std::string& path2) {
struct statfs stat1, stat2;
statfs(path1.c_str(), &stat1);
statfs(path2.c_str(), &stat2);
return stat1.f_fsid.__val[0] == stat2.f_fsid.__val[0] && stat1.f_fsid.__val[1] == stat2.f_fsid.__val[1];
}

std::atomic<size_t> _idx = 0;
std::vector<DirPtr> _dirs;
};
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/spill/log_block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,15 @@ class LogBlock : public Block {
Status append(const std::vector<Slice>& data) override {
size_t total_size = 0;
std::for_each(data.begin(), data.end(), [&](const Slice& slice) { total_size += slice.size; });
if (_container->dir()->get_current_size() + total_size > _container->dir()->get_max_size()) {
return Status::Aborted(
fmt::format("Dir current used size has exceeded limit {}! Current size {}, total_size {}!",
_container->dir()->get_max_size(), _container->dir()->get_current_size(), total_size));
}
RETURN_IF_ERROR(_container->ensure_preallocate(total_size));
RETURN_IF_ERROR(_container->append_data(data));
_size += total_size;
_container->dir()->set_current_size(_container->dir()->get_current_size() + total_size);
return Status::OK();
}

Expand Down

0 comments on commit d752509

Please sign in to comment.