Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions src/paimon/core/utils/file_store_path_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,24 +219,44 @@ Result<std::string> FileStorePathFactory::GetPartitionString(const BinaryRow& pa
if (partition.GetSizeInBytes() == 0) {
return Status::Invalid("invalid binary row partition");
}
auto iter = row_to_str_cache_.find(partition);
if (PAIMON_LIKELY(iter != row_to_str_cache_.end())) {
return iter->second;
{
std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
auto iter = row_to_str_cache_.find(partition);
if (PAIMON_LIKELY(iter != row_to_str_cache_.end())) {
return iter->second;
}
}

std::vector<std::pair<std::string, std::string>> part_values;
PAIMON_ASSIGN_OR_RAISE(part_values, partition_computer_->GeneratePartitionVector(partition));
PAIMON_ASSIGN_OR_RAISE(std::string part_str,
PartitionPathUtils::GeneratePartitionPath(part_values))
PartitionPathUtils::GeneratePartitionPath(part_values));

std::unique_lock<std::shared_mutex> write_lock(cache_mutex_);
auto iter = row_to_str_cache_.find(partition);
if (PAIMON_LIKELY(iter != row_to_str_cache_.end())) {
return iter->second;
}
return row_to_str_cache_.insert({partition, part_str}).first->second;
}

Result<BinaryRow> FileStorePathFactory::ToBinaryRow(
const std::map<std::string, std::string>& partition) const {
{
std::shared_lock<std::shared_mutex> read_lock(cache_mutex_);
auto iter = map_to_row_cache_.find(partition);
if (PAIMON_LIKELY(iter != map_to_row_cache_.end())) {
return iter->second;
}
}

PAIMON_ASSIGN_OR_RAISE(BinaryRow row, partition_computer_->ToBinaryRow(partition));

std::unique_lock<std::shared_mutex> write_lock(cache_mutex_);
auto iter = map_to_row_cache_.find(partition);
if (PAIMON_LIKELY(iter != map_to_row_cache_.end())) {
return iter->second;
}
PAIMON_ASSIGN_OR_RAISE(BinaryRow row, partition_computer_->ToBinaryRow(partition));
return map_to_row_cache_.insert({partition, row}).first->second;
}

Expand Down
5 changes: 4 additions & 1 deletion src/paimon/core/utils/file_store_path_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <cstdint>
#include <map>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <utility>
Expand Down Expand Up @@ -97,7 +99,7 @@ class FileStorePathFactory : public std::enable_shared_from_this<FileStorePathFa
return global_index_external_path_;
}

/// @note This method is NOT THREAD SAFE.
/// Thread-safe for concurrent calls. Cache accesses are protected by internal locks.
Result<std::string> GetPartitionString(const BinaryRow& partition) const;
std::string NewManifestFile() const {
return PathUtil::JoinPath(
Expand Down Expand Up @@ -167,6 +169,7 @@ class FileStorePathFactory : public std::enable_shared_from_this<FileStorePathFa

mutable std::map<std::map<std::string, std::string>, BinaryRow> map_to_row_cache_;
mutable std::unordered_map<BinaryRow, std::string> row_to_str_cache_;
mutable std::shared_mutex cache_mutex_;
};

} // namespace paimon
89 changes: 89 additions & 0 deletions src/paimon/core/utils/file_store_path_factory_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

#include "paimon/core/utils/file_store_path_factory.h"

#include <atomic>
#include <mutex>
#include <optional>
#include <thread>
#include <variant>

#include "arrow/type.h"
Expand Down Expand Up @@ -358,6 +361,92 @@ TEST_F(FileStorePathFactoryTest, TestToBinaryRowAndToPartitionString) {
}
}

TEST_F(FileStorePathFactoryTest, TestConcurrentToBinaryRowAndGetPartitionString) {
auto dir = UniqueTestDirectory::Create();
ASSERT_TRUE(dir);

arrow::FieldVector fields = {
arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()),
arrow::field("f2", arrow::int8()), arrow::field("f3", arrow::int16()),
arrow::field("f4", arrow::int16()), arrow::field("f5", arrow::int32()),
arrow::field("hr", arrow::int32()), arrow::field("f7", arrow::int64()),
arrow::field("dt", arrow::utf8()), arrow::field("f9", arrow::float32()),
arrow::field("f10", arrow::float64()), arrow::field("f11", arrow::utf8()),
arrow::field("f12", arrow::binary()), arrow::field("non-partition-field", arrow::int32())};
auto schema = arrow::schema(fields);

ASSERT_OK_AND_ASSIGN(
std::shared_ptr<FileStorePathFactory> path_factory,
FileStorePathFactory::Create(dir->Str(), schema, {"dt", "hr"}, "default",
/*identifier=*/"mock_format", /*data_file_prefix=*/"data-",
/*legacy_partition_name_enabled=*/true,
/*external_paths=*/std::vector<std::string>(),
/*global_index_external_path=*/std::nullopt,
/*index_file_in_data_file_dir=*/false, mem_pool_));

const std::vector<std::pair<std::map<std::string, std::string>, std::string>> test_cases = {
{{{"dt", "20211224"}, {"hr", "23"}}, "dt=20211224/hr=23/"},
{{{"dt", "default"}, {"hr", "17"}}, "dt=default/hr=17/"},
{{{"dt", "20240812"}, {"hr", "5"}}, "dt=20240812/hr=5/"},
{{{"dt", " a "}, {"hr", "22"}}, "dt= a /hr=22/"},
};

std::atomic<bool> has_error = false;
std::mutex error_mutex;
std::string first_error;

constexpr int32_t kThreadNum = 8;
constexpr int32_t kIterationsPerThread = 500;
std::vector<std::thread> threads;
threads.reserve(kThreadNum);

for (int32_t thread_id = 0; thread_id < kThreadNum; ++thread_id) {
threads.emplace_back([&, thread_id]() {
for (int32_t i = 0; i < kIterationsPerThread; ++i) {
const auto& [partition_map, expected] =
test_cases[(thread_id + i) % test_cases.size()];

auto row_result = path_factory->ToBinaryRow(partition_map);
if (!row_result.ok()) {
std::lock_guard<std::mutex> lock(error_mutex);
has_error.store(true, std::memory_order_relaxed);
if (first_error.empty()) {
first_error = "ToBinaryRow failed: " + row_result.status().ToString();
}
return;
}

auto partition_str_result = path_factory->GetPartitionString(row_result.value());
if (!partition_str_result.ok()) {
std::lock_guard<std::mutex> lock(error_mutex);
has_error.store(true, std::memory_order_relaxed);
if (first_error.empty()) {
first_error = "GetPartitionString failed: " +
partition_str_result.status().ToString();
}
return;
}

if (partition_str_result.value() != expected) {
std::lock_guard<std::mutex> lock(error_mutex);
has_error.store(true, std::memory_order_relaxed);
if (first_error.empty()) {
first_error = "Unexpected partition string, expected=" + expected +
", actual=" + partition_str_result.value();
}
return;
}
}
});
}

for (auto& thread : threads) {
thread.join();
}

ASSERT_FALSE(has_error.load(std::memory_order_relaxed)) << first_error;
}

TEST_F(FileStorePathFactoryTest, TestCreateIndexFileFactory) {
{
// test without external path & index_file_in_data_file_dir = false
Expand Down
Loading