Skip to content

Commit

Permalink
[Enhancement] Support limit dir size for spill
Browse files Browse the repository at this point in the history
Signed-off-by: caneGuy <[email protected]>
  • Loading branch information
caneGuy committed Jul 14, 2023
1 parent 96c0dcd commit 330cea3
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 2 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,10 @@ CONF_mInt32(spill_init_partition, "16");
// The maximum size of a single log block container file, this is not a hard limit.
// If the file size exceeds this limit, a new file will be created to store the block.
CONF_Int64(spill_max_log_block_container_bytes, "10737418240"); // 10GB
// The maximum size of a single spill directory, for some case the spill directory may
// be the same with storage path. Spill will return with error when used size has exceeded
// the limit.
CONF_mDouble(spill_max_dir_bytes_ratio, "0.3"); // 30%

// Now, only get_info is processed by _async_thread_pool, and only needs a small number of threads.
// The default value is set as the THREAD_POOL_SIZE of RoutineLoadTaskScheduler of FE.
Expand Down
10 changes: 9 additions & 1 deletion be/src/exec/spill/dir_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Status DirManager::init() {

std::regex query_id_pattern("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$");

double max_dir_size_ratio = config::spill_max_dir_bytes_ratio;
for (const auto& path : spill_local_storage_paths) {
std::string spill_dir_path = path.path;
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(spill_dir_path));
Expand All @@ -76,7 +77,14 @@ Status DirManager::init() {
}
return true;
}));
_dirs.emplace_back(std::make_shared<Dir>(spill_dir_path, std::move(fs)));
ASSIGN_OR_RETURN(auto space_info, FileSystem::Default()->space(spill_dir_path));
int64_t max_dir_size = space_info.capacity;
for (const auto& storage_path : storage_paths) {
if (is_same_disk(spill_dir_path, storage_path)) {
max_dir_size = max_dir_size * max_dir_size_ratio;
}
}
_dirs.emplace_back(std::make_shared<Dir>(spill_dir_path, std::move(fs), max_dir_size));
}
return Status::OK();
}
Expand Down
20 changes: 19 additions & 1 deletion be/src/exec/spill/dir_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#pragma once

#include <sys/statfs.h>

#include <atomic>
#include <memory>

Expand All @@ -27,14 +29,23 @@ namespace starrocks::spill {
// @TODO(silverbullet233): maintain some stats, such as the capacity
class Dir {
public:
Dir(std::string dir, std::shared_ptr<FileSystem> fs) : _dir(std::move(dir)), _fs(fs) {}
Dir(std::string dir, std::shared_ptr<FileSystem> fs, int64_t max_dir_size)
: _dir(std::move(dir)), _fs(fs), _max_size(max_dir_size) {}

FileSystem* fs() const { return _fs.get(); }
std::string dir() const { return _dir; }

int64_t get_current_size() const { return _current_size.load(); }

void set_current_size(int64_t value) { _current_size.store(value); }

int64_t get_max_size() const { return _max_size; }

private:
std::string _dir;
std::shared_ptr<FileSystem> _fs;
int64_t _max_size;
std::atomic<int64_t> _current_size;
};
using DirPtr = std::shared_ptr<Dir>;

Expand All @@ -55,6 +66,13 @@ class DirManager {
StatusOr<Dir*> acquire_writable_dir(const AcquireDirOptions& opts);

private:
bool is_same_disk(const std::string& path1, const std::string& path2) {
struct statfs stat1, stat2;
statfs(path1.c_str(), &stat1);
statfs(path2.c_str(), &stat2);
return stat1.f_fsid.__val[0] == stat2.f_fsid.__val[0] && stat1.f_fsid.__val[1] == stat2.f_fsid.__val[1];
}

std::atomic<size_t> _idx = 0;
std::vector<DirPtr> _dirs;
};
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/spill/log_block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,14 @@ class LogBlock : public Block {
Status append(const std::vector<Slice>& data) override {
size_t total_size = 0;
std::for_each(data.begin(), data.end(), [&](const Slice& slice) { total_size += slice.size; });
if (_container->dir()->get_current_size() + total_size > _container->dir()->get_max_size()) {
return Status::Aborted(fmt::format("Dir current used size has exceeded limit {}! Current size {}, total_size {}!",
_container->dir()->get_max_size(), _container->dir()->get_current_size(), total_size));
}
RETURN_IF_ERROR(_container->ensure_preallocate(total_size));
RETURN_IF_ERROR(_container->append_data(data));
_size += total_size;
_container->dir()->set_current_size(_container->dir()->get_current_size() + total_size);
return Status::OK();
}

Expand Down

0 comments on commit 330cea3

Please sign in to comment.