Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Support limit dir size for spill #26787

Merged
merged 3 commits into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,10 @@ CONF_mInt32(spill_init_partition, "16");
// The maximum size of a single log block container file, this is not a hard limit.
// If the file size exceeds this limit, a new file will be created to store the block.
CONF_Int64(spill_max_log_block_container_bytes, "10737418240"); // 10GB
// The maximum size of a single spill directory, for some case the spill directory may
// be the same with storage path. Spill will return with error when used size has exceeded
// the limit.
CONF_mDouble(spill_max_dir_bytes_ratio, "0.8"); // 80%

// Now, only get_info is processed by _async_thread_pool, and only needs a small number of threads.
// The default value is set as the THREAD_POOL_SIZE of RoutineLoadTaskScheduler of FE.
Expand Down
21 changes: 20 additions & 1 deletion be/src/exec/spill/dir_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,22 @@ Status DirManager::init() {
} else {
iter++;
}
auto iter_tmp = iter;
for (; iter_tmp != spill_local_storage_paths.end(); ++iter_tmp) {
if (is_same_disk(path, iter_tmp->path)) {
return Status::InvalidArgument(fmt::format(
"spill_local_storage_dir {} has the same disk path with {}, please use another path", path,
iter_tmp->path));
}
}
}
if (spill_local_storage_paths.empty()) {
return Status::InvalidArgument("cannot find available spill_local_storage_dir");
}

std::regex query_id_pattern("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$");

double max_dir_size_ratio = config::spill_max_dir_bytes_ratio;
for (const auto& path : spill_local_storage_paths) {
std::string spill_dir_path = path.path;
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(spill_dir_path));
Expand All @@ -76,7 +85,17 @@ Status DirManager::init() {
}
return true;
}));
_dirs.emplace_back(std::make_shared<Dir>(spill_dir_path, std::move(fs)));
ASSIGN_OR_RETURN(auto space_info, FileSystem::Default()->space(spill_dir_path));
int64_t max_dir_size = space_info.capacity;
for (const auto& storage_path : storage_paths) {
if (is_same_disk(spill_dir_path, storage_path)) {
max_dir_size = max_dir_size * max_dir_size_ratio;
// do not consider multi spill directory on the same disk
// we will return error state during initialize
break;
}
}
_dirs.emplace_back(std::make_shared<Dir>(spill_dir_path, std::move(fs), max_dir_size));
}
return Status::OK();
}
Expand Down
20 changes: 19 additions & 1 deletion be/src/exec/spill/dir_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <sys/statfs.h>

#include <atomic>
#include <memory>

Expand All @@ -27,14 +29,23 @@ namespace starrocks::spill {
// @TODO(silverbullet233): maintain some stats, such as the capacity
class Dir {
public:
Dir(std::string dir, std::shared_ptr<FileSystem> fs) : _dir(std::move(dir)), _fs(fs) {}
Dir(std::string dir, std::shared_ptr<FileSystem> fs, int64_t max_dir_size)
: _dir(std::move(dir)), _fs(fs), _max_size(max_dir_size) {}

FileSystem* fs() const { return _fs.get(); }
std::string dir() const { return _dir; }

int64_t get_current_size() const { return _current_size.load(); }

void set_current_size(int64_t value) { _current_size.store(value); }

int64_t get_max_size() const { return _max_size; }

private:
std::string _dir;
std::shared_ptr<FileSystem> _fs;
int64_t _max_size;
std::atomic<int64_t> _current_size;
};
using DirPtr = std::shared_ptr<Dir>;

Expand All @@ -55,6 +66,13 @@ class DirManager {
StatusOr<Dir*> acquire_writable_dir(const AcquireDirOptions& opts);

private:
bool is_same_disk(const std::string& path1, const std::string& path2) {
struct statfs stat1, stat2;
statfs(path1.c_str(), &stat1);
statfs(path2.c_str(), &stat2);
return stat1.f_fsid.__val[0] == stat2.f_fsid.__val[0] && stat1.f_fsid.__val[1] == stat2.f_fsid.__val[1];
}

std::atomic<size_t> _idx = 0;
std::vector<DirPtr> _dirs;
};
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/spill/log_block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,15 @@ class LogBlock : public Block {
Status append(const std::vector<Slice>& data) override {
size_t total_size = 0;
std::for_each(data.begin(), data.end(), [&](const Slice& slice) { total_size += slice.size; });
if (_container->dir()->get_current_size() + total_size > _container->dir()->get_max_size()) {
return Status::Aborted(
fmt::format("Dir current used size has exceeded limit {}! Current size {}, total_size {}!",
_container->dir()->get_max_size(), _container->dir()->get_current_size(), total_size));
}
RETURN_IF_ERROR(_container->ensure_preallocate(total_size));
RETURN_IF_ERROR(_container->append_data(data));
_size += total_size;
_container->dir()->set_current_size(_container->dir()->get_current_size() + total_size);
return Status::OK();
}

Expand Down
Loading