Skip to content

Commit

Permalink
[Enhancement] Support limit dir size for spill
Browse files Browse the repository at this point in the history
Signed-off-by: caneGuy <[email protected]>
  • Loading branch information
caneGuy committed Jul 10, 2023
1 parent 96c0dcd commit add2b29
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 2 deletions.
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -876,6 +876,10 @@ CONF_mInt32(spill_init_partition, "16");
// The maximum size of a single log block container file, this is not a hard limit.
// If the file size exceeds this limit, a new file will be created to store the block.
CONF_Int64(spill_max_log_block_container_bytes, "10737418240"); // 10GB
// The maximum size of a single spill directory, for some case the spill directory may
// be the same with storage path. Spill will return with error when used size has exceeded
// the limit.
CONF_Int64(spill_max_dir_bytes, "214748364800"); // 200GB

// Now, only get_info is processed by _async_thread_pool, and only needs a small number of threads.
// The default value is set as the THREAD_POOL_SIZE of RoutineLoadTaskScheduler of FE.
Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/spill/dir_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ Status DirManager::init() {

std::regex query_id_pattern("^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$");

int64_t max_dir_size = config::spill_max_dir_bytes;
for (const auto& path : spill_local_storage_paths) {
std::string spill_dir_path = path.path;
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(spill_dir_path));
Expand All @@ -76,7 +77,7 @@ Status DirManager::init() {
}
return true;
}));
_dirs.emplace_back(std::make_shared<Dir>(spill_dir_path, std::move(fs)));
_dirs.emplace_back(std::make_shared<Dir>(spill_dir_path, std::move(fs), max_dir_size));
}
return Status::OK();
}
Expand Down
16 changes: 15 additions & 1 deletion be/src/exec/spill/dir_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,28 @@ namespace starrocks::spill {
// @TODO(silverbullet233): maintain some stats, such as the capacity
class Dir {
public:
Dir(std::string dir, std::shared_ptr<FileSystem> fs) : _dir(std::move(dir)), _fs(fs) {}
Dir(std::string dir, std::shared_ptr<FileSystem> fs, int64_t max_dir_size):_dir(std::move(dir)), _fs(fs), _max_size(max_dir_size) {}

FileSystem* fs() const { return _fs.get(); }
std::string dir() const { return _dir; }

int64_t get_current_size() const {
return _current_size.load();
}

void set_current_size(int64_t value) {
_current_size.store(value);
}

int64_t get_max_size() const {
return _max_size;
}

private:
std::string _dir;
std::shared_ptr<FileSystem> _fs;
int64_t _max_size;
std::atomic<int64_t> _current_size;
};
using DirPtr = std::shared_ptr<Dir>;

Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/spill/log_block_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,13 @@ class LogBlock : public Block {
Status append(const std::vector<Slice>& data) override {
size_t total_size = 0;
std::for_each(data.begin(), data.end(), [&](const Slice& slice) { total_size += slice.size; });
if (_container->dir()->get_current_size() + total_size > _container->dir()->get_max_size()) {
return Status::Aborted("Dir current used size has exceeded limit!");
}
RETURN_IF_ERROR(_container->ensure_preallocate(total_size));
RETURN_IF_ERROR(_container->append_data(data));
_size += total_size;
_container->dir()->set_current_size(_container->dir()->get_current_size() + total_size);
return Status::OK();
}

Expand Down

0 comments on commit add2b29

Please sign in to comment.