diff --git a/src/paimon/core/utils/file_store_path_factory.cpp b/src/paimon/core/utils/file_store_path_factory.cpp index c2ff72793..7a697ab62 100644 --- a/src/paimon/core/utils/file_store_path_factory.cpp +++ b/src/paimon/core/utils/file_store_path_factory.cpp @@ -219,24 +219,44 @@ Result FileStorePathFactory::GetPartitionString(const BinaryRow& pa if (partition.GetSizeInBytes() == 0) { return Status::Invalid("invalid binary row partition"); } - auto iter = row_to_str_cache_.find(partition); - if (PAIMON_LIKELY(iter != row_to_str_cache_.end())) { - return iter->second; + { + std::shared_lock read_lock(cache_mutex_); + auto iter = row_to_str_cache_.find(partition); + if (PAIMON_LIKELY(iter != row_to_str_cache_.end())) { + return iter->second; + } } + std::vector> part_values; PAIMON_ASSIGN_OR_RAISE(part_values, partition_computer_->GeneratePartitionVector(partition)); PAIMON_ASSIGN_OR_RAISE(std::string part_str, - PartitionPathUtils::GeneratePartitionPath(part_values)) + PartitionPathUtils::GeneratePartitionPath(part_values)); + + std::unique_lock write_lock(cache_mutex_); + auto iter = row_to_str_cache_.find(partition); + if (PAIMON_LIKELY(iter != row_to_str_cache_.end())) { + return iter->second; + } return row_to_str_cache_.insert({partition, part_str}).first->second; } Result FileStorePathFactory::ToBinaryRow( const std::map& partition) const { + { + std::shared_lock read_lock(cache_mutex_); + auto iter = map_to_row_cache_.find(partition); + if (PAIMON_LIKELY(iter != map_to_row_cache_.end())) { + return iter->second; + } + } + + PAIMON_ASSIGN_OR_RAISE(BinaryRow row, partition_computer_->ToBinaryRow(partition)); + + std::unique_lock write_lock(cache_mutex_); auto iter = map_to_row_cache_.find(partition); if (PAIMON_LIKELY(iter != map_to_row_cache_.end())) { return iter->second; } - PAIMON_ASSIGN_OR_RAISE(BinaryRow row, partition_computer_->ToBinaryRow(partition)); return map_to_row_cache_.insert({partition, row}).first->second; } diff --git a/src/paimon/core/utils/file_store_path_factory.h b/src/paimon/core/utils/file_store_path_factory.h index beb104b76..6a0c14009 100644 --- a/src/paimon/core/utils/file_store_path_factory.h +++ b/src/paimon/core/utils/file_store_path_factory.h @@ -20,6 +20,8 @@ #include #include #include +#include +#include #include #include #include @@ -97,7 +99,7 @@ class FileStorePathFactory : public std::enable_shared_from_this GetPartitionString(const BinaryRow& partition) const; std::string NewManifestFile() const { return PathUtil::JoinPath( @@ -167,6 +169,7 @@ class FileStorePathFactory : public std::enable_shared_from_this, BinaryRow> map_to_row_cache_; mutable std::unordered_map row_to_str_cache_; + mutable std::shared_mutex cache_mutex_; }; } // namespace paimon diff --git a/src/paimon/core/utils/file_store_path_factory_test.cpp b/src/paimon/core/utils/file_store_path_factory_test.cpp index 7c47cb8ca..3b8f6addc 100644 --- a/src/paimon/core/utils/file_store_path_factory_test.cpp +++ b/src/paimon/core/utils/file_store_path_factory_test.cpp @@ -16,7 +16,10 @@ #include "paimon/core/utils/file_store_path_factory.h" +#include +#include #include +#include #include #include "arrow/type.h" @@ -358,6 +361,92 @@ TEST_F(FileStorePathFactoryTest, TestToBinaryRowAndToPartitionString) { } } +TEST_F(FileStorePathFactoryTest, TestConcurrentToBinaryRowAndGetPartitionString) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), arrow::field("f5", arrow::int32()), + arrow::field("hr", arrow::int32()), arrow::field("f7", arrow::int64()), + arrow::field("dt", arrow::utf8()), arrow::field("f9", arrow::float32()), + arrow::field("f10", arrow::float64()), arrow::field("f11", arrow::utf8()), + arrow::field("f12", arrow::binary()), arrow::field("non-partition-field", arrow::int32())}; + auto schema = arrow::schema(fields); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr path_factory, + FileStorePathFactory::Create(dir->Str(), schema, {"dt", "hr"}, "default", + /*identifier=*/"mock_format", /*data_file_prefix=*/"data-", + /*legacy_partition_name_enabled=*/true, + /*external_paths=*/std::vector(), + /*global_index_external_path=*/std::nullopt, + /*index_file_in_data_file_dir=*/false, mem_pool_)); + + const std::vector, std::string>> test_cases = { + {{{"dt", "20211224"}, {"hr", "23"}}, "dt=20211224/hr=23/"}, + {{{"dt", "default"}, {"hr", "17"}}, "dt=default/hr=17/"}, + {{{"dt", "20240812"}, {"hr", "5"}}, "dt=20240812/hr=5/"}, + {{{"dt", " a "}, {"hr", "22"}}, "dt= a /hr=22/"}, + }; + + std::atomic has_error = false; + std::mutex error_mutex; + std::string first_error; + + constexpr int32_t kThreadNum = 8; + constexpr int32_t kIterationsPerThread = 500; + std::vector threads; + threads.reserve(kThreadNum); + + for (int32_t thread_id = 0; thread_id < kThreadNum; ++thread_id) { + threads.emplace_back([&, thread_id]() { + for (int32_t i = 0; i < kIterationsPerThread; ++i) { + const auto& [partition_map, expected] = + test_cases[(thread_id + i) % test_cases.size()]; + + auto row_result = path_factory->ToBinaryRow(partition_map); + if (!row_result.ok()) { + std::lock_guard lock(error_mutex); + has_error.store(true, std::memory_order_relaxed); + if (first_error.empty()) { + first_error = "ToBinaryRow failed: " + row_result.status().ToString(); + } + return; + } + + auto partition_str_result = path_factory->GetPartitionString(row_result.value()); + if (!partition_str_result.ok()) { + std::lock_guard lock(error_mutex); + has_error.store(true, std::memory_order_relaxed); + if (first_error.empty()) { + first_error = "GetPartitionString failed: " + + partition_str_result.status().ToString(); + } + return; + } + + if (partition_str_result.value() != expected) { + std::lock_guard lock(error_mutex); + has_error.store(true, std::memory_order_relaxed); + if (first_error.empty()) { + first_error = "Unexpected partition string, expected=" + expected + + ", actual=" + partition_str_result.value(); + } + return; + } + } + }); + } + + for (auto& thread : threads) { + thread.join(); + } + + ASSERT_FALSE(has_error.load(std::memory_order_relaxed)) << first_error; +} + TEST_F(FileStorePathFactoryTest, TestCreateIndexFileFactory) { { // test without external path & index_file_in_data_file_dir = false