diff --git a/docs/code-style.md b/docs/code-style.md index 2da15b9ab..c2f9f4e8d 100644 --- a/docs/code-style.md +++ b/docs/code-style.md @@ -28,6 +28,21 @@ This document defines the coding conventions for the paimon-cpp project. All pul --- +## Integer Types + +Use **fixed-width integer types** from ``. Do **not** use plain `int`, `long`, `short`, or `unsigned`. + +| Use | Instead of | +|-----|-----------| +| `int8_t` / `uint8_t` | `char` (for numeric data) | +| `int16_t` / `uint16_t` | `short` | +| `int32_t` / `uint32_t` | `int` / `unsigned int` | +| `int64_t` / `uint64_t` | `long` / `long long` | + +**Exception**: Loop variables iterating over small, bounded ranges (e.g. `for (int32_t i = 0; i < n; ++i)`) must still use `int32_t`, not `int`. + +--- + ## Formatting Formatting is based on **Google C++ Style** with the following overrides (defined in `.clang-format`): diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 54bbab62a..49b149a49 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -212,7 +212,7 @@ struct PAIMON_EXPORT Options { static const char SNAPSHOT_TIME_RETAINED[]; /// "snapshot.expire.limit" - The maximum number of snapshots allowed to expire at a time. - /// Default value is 10. + /// Default value is 50. static const char SNAPSHOT_EXPIRE_LIMIT[]; /// "snapshot.clean-empty-directories" - Whether to try to clean empty directories when expiring @@ -391,7 +391,7 @@ struct PAIMON_EXPORT Options { /// reading the audit_log or binlog system tables. This is only valid for primary key tables. /// Default value is "false". static const char TABLE_READ_SEQUENCE_NUMBER_ENABLED[]; - /// "key-value.sequence-number.enabled" - Whether to include the _SEQUENCE_NUMBER field when + /// "key-value.sequence_number.enabled" - Whether to include the _SEQUENCE_NUMBER field when /// reading key-value data. This is an internal option used by AuditLogTable and BinlogTable /// when table-read.sequence-number.enabled is set to true. Default value is "false". static const char KEY_VALUE_SEQUENCE_NUMBER_ENABLED[]; diff --git a/include/paimon/write_context.h b/include/paimon/write_context.h index 06d37f622..94f9b0d46 100644 --- a/include/paimon/write_context.h +++ b/include/paimon/write_context.h @@ -40,8 +40,8 @@ class PAIMON_EXPORT WriteContext { public: WriteContext(const std::string& root_path, const std::string& commit_user, bool is_streaming_mode, bool ignore_num_bucket_check, bool ignore_previous_files, - const std::optional& write_id, const std::string& branch, - const std::vector& write_schema, + bool enable_multi_thread_spill, const std::optional& write_id, + const std::string& branch, const std::vector& write_schema, const std::shared_ptr& memory_pool, const std::shared_ptr& executor, const std::string& temp_directory, const std::shared_ptr& specific_file_system, @@ -106,6 +106,10 @@ class PAIMON_EXPORT WriteContext { return specific_file_system_; } + bool EnableMultiThreadSpill() const { + return enable_multi_thread_spill_; + } + private: std::string root_path_; std::string commit_user_; @@ -113,6 +117,7 @@ class PAIMON_EXPORT WriteContext { bool is_streaming_mode_; bool ignore_num_bucket_check_; bool ignore_previous_files_; + bool enable_multi_thread_spill_; std::optional write_id_; std::vector write_schema_; std::shared_ptr memory_pool_; @@ -222,6 +227,13 @@ class PAIMON_EXPORT WriteContextBuilder { WriteContextBuilder& WithFileSystemSchemeToIdentifierMap( const std::map& fs_scheme_to_identifier_map); + /// Set the thread number for write buffer spill operations. (default is 0) + /// If <= 0, threading is disabled for spill IPC read/write. + /// If > 0, sets arrow CPU thread pool capacity for spill operations. + /// @param thread_number The thread number to use for spill operations. + /// @return Reference to this builder for method chaining. + WriteContextBuilder& SetWriteBufferSpillThreadNumber(int32_t thread_number); + /// Build and return a `WriteContext` instance with input validation. /// @return Result containing the constructed `WriteContext` or an error status. Result> Finish(); diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index db22b01a9..9f495e55c 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -255,6 +255,7 @@ set(PAIMON_CORE_SRCS core/mergetree/merge_tree_writer.cpp core/mergetree/in_memory_sort_buffer.cpp core/mergetree/external_sort_buffer.cpp + core/mergetree/spill_file_merger.cpp core/mergetree/write_buffer.cpp core/mergetree/levels.cpp core/mergetree/lookup_file.cpp @@ -651,6 +652,7 @@ if(PAIMON_BUILD_TESTS) core/mergetree/merge_tree_writer_test.cpp core/mergetree/write_buffer_test.cpp core/mergetree/sort_buffer_test.cpp + core/mergetree/spill_file_merger_test.cpp core/mergetree/sorted_run_test.cpp core/mergetree/spill_channel_manager_test.cpp core/mergetree/spill_reader_writer_test.cpp diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index 7a75c549d..23330fe7c 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -98,7 +98,7 @@ const char Options::GLOBAL_INDEX_THREAD_NUM[] = "global-index.thread-num"; const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path"; const char Options::AGGREGATION_REMOVE_RECORD_ON_DELETE[] = "aggregation.remove-record-on-delete"; const char Options::TABLE_READ_SEQUENCE_NUMBER_ENABLED[] = "table-read.sequence-number.enabled"; -const char Options::KEY_VALUE_SEQUENCE_NUMBER_ENABLED[] = "key-value.sequence-number.enabled"; +const char Options::KEY_VALUE_SEQUENCE_NUMBER_ENABLED[] = "key-value.sequence_number.enabled"; const char Options::SCAN_TIMESTAMP_MILLIS[] = "scan.timestamp-millis"; const char Options::SCAN_TIMESTAMP[] = "scan.timestamp"; const char Options::SCAN_TAG_NAME[] = "scan.tag-name"; diff --git a/src/paimon/common/fs/file_system_test.cpp b/src/paimon/common/fs/file_system_test.cpp index c3e44406f..bf177d1ab 100644 --- a/src/paimon/common/fs/file_system_test.cpp +++ b/src/paimon/common/fs/file_system_test.cpp @@ -173,7 +173,7 @@ class FileSystemTest : public ::testing::Test, public ::testing::WithParamInterf std::string test_root_; }; -TEST_P(FileSystemTest, TestNoneFileSystemFactory) { +TEST(FileSystemStaticTest, TestNoneFileSystemFactory) { std::map fs_options; Result> fs = FileSystemFactory::Get("not exist", "/tmp", fs_options); @@ -516,7 +516,7 @@ TEST_P(FileSystemTest, TestReadAndWriteAndAtomicStoreFile) { ASSERT_EQ(read_content, new_content); } -TEST_P(FileSystemTest, TestIsObjectStore) { +TEST(FileSystemStaticTest, TestIsObjectStore) { ASSERT_OK_AND_ASSIGN(bool is_object_store, FileSystem::IsObjectStore("file:///tmp/test.txt")); ASSERT_FALSE(is_object_store); ASSERT_OK_AND_ASSIGN(is_object_store, FileSystem::IsObjectStore("/tmp/test.txt")); diff --git a/src/paimon/common/global_index/btree/btree_global_index_integration_test.cpp b/src/paimon/common/global_index/btree/btree_global_index_integration_test.cpp index fcf696ad3..0b46de5e1 100644 --- a/src/paimon/common/global_index/btree/btree_global_index_integration_test.cpp +++ b/src/paimon/common/global_index/btree/btree_global_index_integration_test.cpp @@ -1491,7 +1491,7 @@ TEST_P(BTreeGlobalIndexIntegrationTest, WriteAndReadAllNull) { } } -TEST_P(BTreeGlobalIndexIntegrationTest, WriteAndReadLargeDataWithSmallBlocks) { +TEST_F(BTreeGlobalIndexIntegrationTest, WriteAndReadLargeDataWithSmallBlocks) { // Use very small block size and cache size to force multiple block evictions auto file_writer = std::make_shared(fs_, base_path_); auto field = arrow::field("int_field", arrow::int32()); @@ -1710,7 +1710,7 @@ TEST_P(BTreeGlobalIndexIntegrationTest, CreateReaderWithMultiFieldSchema) { "supposed to have single field"); } -TEST_P(BTreeGlobalIndexIntegrationTest, CreateWriterWithMissingField) { +TEST_F(BTreeGlobalIndexIntegrationTest, CreateWriterWithMissingField) { auto file_writer = std::make_shared(fs_, base_path_); auto type = arrow::struct_({arrow::field("existing_field", arrow::int32())}); auto struct_type = std::dynamic_pointer_cast(type); diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index c46bf98a6..dac20c139 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -583,8 +583,8 @@ struct CoreOptions::Impl { int32_t snapshot_num_retain_min = 10; // Parse snapshot.num-retained.max - maximum completed snapshots to retain int32_t snapshot_num_retain_max = std::numeric_limits::max(); - // Parse snapshot.expire.limit - maximum snapshots allowed to expire at a time, default 10 - int32_t snapshot_expire_limit = 10; + // Parse snapshot.expire.limit - maximum snapshots allowed to expire at a time, default 50 + int32_t snapshot_expire_limit = 50; // Parse snapshot.time-retained - maximum time of completed snapshots to retain int64_t snapshot_time_retained = 1 * 3600 * 1000; // 1 hour PAIMON_RETURN_NOT_OK( @@ -642,7 +642,7 @@ struct CoreOptions::Impl { // Parse table-read.sequence-number.enabled - expose sequence number in system tables PAIMON_RETURN_NOT_OK(parser.Parse(Options::TABLE_READ_SEQUENCE_NUMBER_ENABLED, &table_read_sequence_number_enabled)); - // Parse key-value.sequence-number.enabled - internal sequence number read switch + // Parse key-value.sequence_number.enabled - internal sequence number read switch PAIMON_RETURN_NOT_OK(parser.Parse(Options::KEY_VALUE_SEQUENCE_NUMBER_ENABLED, &key_value_sequence_number_enabled)); // Parse partial-update.remove-record-on-sequence-group diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index be2fab72e..9b968abce 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -71,7 +71,7 @@ TEST(CoreOptionsTest, TestDefaultValue) { ExpireConfig expire_config = core_options.GetExpireConfig(); ASSERT_EQ(10, expire_config.GetSnapshotRetainMin()); ASSERT_EQ(std::numeric_limits::max(), expire_config.GetSnapshotRetainMax()); - ASSERT_EQ(10, expire_config.GetSnapshotMaxDeletes()); + ASSERT_EQ(50, expire_config.GetSnapshotMaxDeletes()); ASSERT_FALSE(expire_config.CleanEmptyDirectories()); ASSERT_EQ(1 * 3600 * 1000L, expire_config.GetSnapshotTimeRetainMs()); ASSERT_EQ(std::vector(), core_options.GetSequenceField()); diff --git a/src/paimon/core/disk/file_io_channel.h b/src/paimon/core/disk/file_io_channel.h index 165add20b..fb4e817ea 100644 --- a/src/paimon/core/disk/file_io_channel.h +++ b/src/paimon/core/disk/file_io_channel.h @@ -17,7 +17,6 @@ #pragma once #include -#include #include #include @@ -68,4 +67,9 @@ class PAIMON_EXPORT FileIOChannel { }; }; +struct FileChannelInfo { + FileIOChannel::ID channel_id; + int64_t file_size; +}; + } // namespace paimon diff --git a/src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter_test.cpp b/src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter_test.cpp index 4b07be37e..96de1b6a3 100644 --- a/src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter_test.cpp +++ b/src/paimon/core/mergetree/compact/lookup_merge_tree_compact_rewriter_test.cpp @@ -101,7 +101,8 @@ class LookupMergeTreeCompactRewriterTest : public ::testing::TestWithParam({"key"}), data_path_factory, key_comparator, /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper, /*schema_id=*/latest_schema.value()->Id(), arrow_schema_, - options, std::make_shared(), /*io_manager=*/nullptr, pool_)); + options, std::make_shared(), /*io_manager=*/nullptr, + /*enable_multi_thread_spill=*/false, pool_)); // write data ArrowArray c_src_array; diff --git a/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp index a88840b46..a7f01c4a7 100644 --- a/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp +++ b/src/paimon/core/mergetree/compact/merge_tree_compact_rewriter.cpp @@ -263,10 +263,10 @@ Result MergeTreeCompactRewriter::RewriteCompaction( ScopeGuard write_guard([&]() -> void { rolling_writer->Abort(); - merge_file_split_read_.reset(); for (const auto& reader : reader_holders) { reader->Close(); } + merge_file_split_read_.reset(); }); for (const auto& section : sections) { diff --git a/src/paimon/core/mergetree/compact/sort_merge_reader_with_loser_tree.h b/src/paimon/core/mergetree/compact/sort_merge_reader_with_loser_tree.h index d302ffec4..096f049ae 100644 --- a/src/paimon/core/mergetree/compact/sort_merge_reader_with_loser_tree.h +++ b/src/paimon/core/mergetree/compact/sort_merge_reader_with_loser_tree.h @@ -51,6 +51,9 @@ class SortMergeReaderWithLoserTree : public SortMergeReader { void Close() override { loser_tree_->Close(); + if (merge_function_wrapper_) { + merge_function_wrapper_->Reset(); + } } class Iterator : public SortMergeReader::Iterator { diff --git a/src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.h b/src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.h index c953516f4..6c1ab51b9 100644 --- a/src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.h +++ b/src/paimon/core/mergetree/compact/sort_merge_reader_with_min_heap.h @@ -70,6 +70,9 @@ class SortMergeReaderWithMinHeap : public SortMergeReader { for (const auto& reader : readers_holder_) { reader->Close(); } + if (merge_function_wrapper_) { + merge_function_wrapper_->Reset(); + } } std::shared_ptr GetReaderMetrics() const override { diff --git a/src/paimon/core/mergetree/external_sort_buffer.cpp b/src/paimon/core/mergetree/external_sort_buffer.cpp index 716836007..ae2f63a7e 100644 --- a/src/paimon/core/mergetree/external_sort_buffer.cpp +++ b/src/paimon/core/mergetree/external_sort_buffer.cpp @@ -16,6 +16,7 @@ #include "paimon/core/mergetree/external_sort_buffer.h" +#include #include #include @@ -46,7 +47,12 @@ Result> ExternalSortBuffer::Create( const std::shared_ptr& key_comparator, const std::shared_ptr& user_defined_seq_comparator, const CoreOptions& options, const std::shared_ptr& io_manager, - const std::shared_ptr& pool) { + bool enable_multi_thread_spill, const std::shared_ptr& pool) { + if (options.GetLocalSortMaxNumFileHandles() < kSpillMinFanIn) { + return Status::Invalid(fmt::format( + "invalid '{}': {}, must be at least {}", Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, + options.GetLocalSortMaxNumFileHandles(), kSpillMinFanIn)); + } arrow::FieldVector key_fields; key_fields.reserve(trimmed_primary_keys.size()); for (const auto& primary_key : trimmed_primary_keys) { @@ -58,9 +64,10 @@ Result> ExternalSortBuffer::Create( PAIMON_ASSIGN_OR_RAISE(std::shared_ptr spill_channel_enumerator, io_manager->CreateChannelEnumerator()); - return std::unique_ptr(new ExternalSortBuffer( - std::move(in_memory_buffer), key_schema, value_schema, key_comparator, - user_defined_seq_comparator, options, spill_channel_enumerator, pool)); + return std::unique_ptr( + new ExternalSortBuffer(std::move(in_memory_buffer), key_schema, value_schema, + key_comparator, user_defined_seq_comparator, options, + spill_channel_enumerator, enable_multi_thread_spill, pool)); } ExternalSortBuffer::ExternalSortBuffer( @@ -71,7 +78,7 @@ ExternalSortBuffer::ExternalSortBuffer( const std::shared_ptr& user_defined_seq_comparator, const CoreOptions& options, const std::shared_ptr& spill_channel_enumerator, - const std::shared_ptr& pool) + bool enable_multi_thread_spill, const std::shared_ptr& pool) : in_memory_buffer_(std::move(in_memory_buffer)), pool_(pool), key_schema_(key_schema), @@ -80,9 +87,14 @@ ExternalSortBuffer::ExternalSortBuffer( user_defined_seq_comparator_(user_defined_seq_comparator), write_schema_(SpecialFields::CompleteSequenceAndValueKindField(value_schema)), options_(options), - spill_channel_manager_(std::make_shared( - options_.GetFileSystem(), options_.GetLocalSortMaxNumFileHandles())), - spill_channel_enumerator_(spill_channel_enumerator) {} + max_fan_in_(options.GetLocalSortMaxNumFileHandles()), + enable_multi_thread_spill_(enable_multi_thread_spill), + spill_channel_manager_( + std::make_shared(options_.GetFileSystem(), max_fan_in_)), + spill_merger_(std::make_unique(max_fan_in_)), + spill_channel_enumerator_(spill_channel_enumerator), + actual_max_fan_in_(max_fan_in_), + spill_batch_size_(options_.GetWriteBatchSize()) {} ExternalSortBuffer::~ExternalSortBuffer() { DoClear(); @@ -94,7 +106,10 @@ bool ExternalSortBuffer::HasSpilledData() const { void ExternalSortBuffer::DoClear() { in_memory_buffer_->Clear(); - CleanupSpillFiles(); + + spill_channel_manager_->Reset(); + total_spill_disk_bytes_ = 0; + spill_merger_->Clear(); } void ExternalSortBuffer::Clear() { @@ -105,18 +120,41 @@ uint64_t ExternalSortBuffer::GetMemorySize() const { return in_memory_buffer_->GetMemorySize(); } +void ExternalSortBuffer::UpdateSpillParameters() { + int64_t estimated_row_size = in_memory_buffer_->GetEstimateMemoryUseForEachRow(); + if (estimated_row_size <= 0) { + return; + } + + const int32_t max_batch_size = options_.GetWriteBatchSize(); + const int32_t min_batch_size = std::min(kSpillMinBatchSize, max_batch_size); + const int64_t merge_budget = options_.GetWriteBufferSize(); + const int64_t max_memory_use_per_handle = merge_budget / max_fan_in_; + + spill_batch_size_ = max_memory_use_per_handle / estimated_row_size; + spill_batch_size_ = std::clamp(spill_batch_size_, min_batch_size, max_batch_size); + + actual_max_fan_in_ = merge_budget / (spill_batch_size_ * estimated_row_size); + actual_max_fan_in_ = std::clamp(actual_max_fan_in_, kSpillMinFanIn, max_fan_in_); + + // Re-derive spill_batch_size_ from the clamped actual_max_fan_in_ to stay within merge_budget. + spill_batch_size_ = merge_budget / (actual_max_fan_in_ * estimated_row_size); + spill_batch_size_ = std::clamp(spill_batch_size_, 1, max_batch_size); + + spill_merger_->SetMaxFanIn(actual_max_fan_in_); +} + Result ExternalSortBuffer::FlushMemory() { if (!in_memory_buffer_->HasData()) { return true; } - int64_t max_spill_disk_size = options_.GetWriteBufferSpillMaxDiskSize(); - + UpdateSpillParameters(); PAIMON_ASSIGN_OR_RAISE(std::vector> memory_buffer_readers, in_memory_buffer_->CreateReaders()); PAIMON_RETURN_NOT_OK(SpillMemoryBuffer(std::move(memory_buffer_readers))); in_memory_buffer_->Clear(); - return total_spill_disk_bytes_ < max_spill_disk_size; + return total_spill_disk_bytes_ < options_.GetWriteBufferSpillMaxDiskSize(); } Result ExternalSortBuffer::Write(std::unique_ptr&& batch) { @@ -128,11 +166,17 @@ Result ExternalSortBuffer::Write(std::unique_ptr&& batch) { } Result>> ExternalSortBuffer::CreateReaders() { - PAIMON_ASSIGN_OR_RAISE(std::vector> readers, - CollectSpillReaders()); PAIMON_ASSIGN_OR_RAISE(std::vector> memory_readers, in_memory_buffer_->CreateReaders()); + if (!HasSpilledData()) { + return memory_readers; + } + int32_t max_spill_files = actual_max_fan_in_ - 1; + PAIMON_RETURN_NOT_OK( + spill_merger_->RunFinalMergeIfNeeded(max_spill_files, CreateSpillFileMergeFn())); + PAIMON_ASSIGN_OR_RAISE(std::vector> readers, + CreateSpillReaders(spill_merger_->GetAllFiles())); readers.insert(readers.end(), std::make_move_iterator(memory_readers.begin()), std::make_move_iterator(memory_readers.end())); return readers; @@ -142,33 +186,28 @@ bool ExternalSortBuffer::HasData() const { return in_memory_buffer_->HasData() || HasSpilledData(); } -void ExternalSortBuffer::CleanupSpillFiles() { - spill_channel_manager_->Reset(); - total_spill_disk_bytes_ = 0; -} - -Result>> ExternalSortBuffer::CollectSpillReaders() - const { +Result>> ExternalSortBuffer::CreateSpillReaders( + const std::vector& files) const { std::vector> readers; - const auto& channel_ids = spill_channel_manager_->GetChannels(); - readers.reserve(channel_ids.size()); - for (const auto& channel_id : channel_ids) { - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr spill_reader, - SpillReader::Create(options_.GetFileSystem(), key_schema_, - value_schema_, pool_, channel_id)); - readers.push_back(std::move(spill_reader)); + readers.reserve(files.size()); + for (const auto& file : files) { + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr reader, + SpillReader::Create(options_.GetFileSystem(), key_schema_, value_schema_, + enable_multi_thread_spill_, file.channel_id, pool_)); + readers.push_back(std::move(reader)); } return readers; } -Result ExternalSortBuffer::SpillToDisk( +Result ExternalSortBuffer::SpillToDisk( std::vector>&& readers, int32_t write_batch_size) { const auto& spill_compress_options = options_.GetSpillCompressOptions(); PAIMON_ASSIGN_OR_RAISE( std::unique_ptr spill_writer, SpillWriter::Create(options_.GetFileSystem(), write_schema_, spill_channel_enumerator_, spill_channel_manager_, spill_compress_options.compress, - spill_compress_options.zstd_level, pool_)); + spill_compress_options.zstd_level, enable_multi_thread_spill_, pool_)); auto cleanup_guard = ScopeGuard([&]() { [[maybe_unused]] auto status = spill_channel_manager_->DeleteChannel(spill_writer->GetChannelId()); @@ -202,41 +241,37 @@ Result ExternalSortBuffer::SpillToDisk( PAIMON_RETURN_NOT_OK(spill_writer->Close()); PAIMON_ASSIGN_OR_RAISE(int64_t spilled_file_size, spill_writer->GetFileSize()); cleanup_guard.Release(); - return spilled_file_size; + return FileChannelInfo{spill_writer->GetChannelId(), spilled_file_size}; } Status ExternalSortBuffer::SpillMemoryBuffer( std::vector>&& readers) { - PAIMON_ASSIGN_OR_RAISE(int64_t spill_file_size, - SpillToDisk(std::move(readers), options_.GetWriteBatchSize())); - total_spill_disk_bytes_ += spill_file_size; - - if (options_.GetLocalSortMaxNumFileHandles() > 0 && - static_cast(spill_channel_manager_->GetChannels().size()) >= - options_.GetLocalSortMaxNumFileHandles()) { - PAIMON_RETURN_NOT_OK(MergeSpilledFiles()); - } - return Status::OK(); + PAIMON_ASSIGN_OR_RAISE(FileChannelInfo file_info, + SpillToDisk(std::move(readers), spill_batch_size_)); + total_spill_disk_bytes_ += file_info.file_size; + spill_merger_->AddFile(file_info); + return spill_merger_->RunMergeIfNeeded(CreateSpillFileMergeFn()); } -Status ExternalSortBuffer::MergeSpilledFiles() { - if (spill_channel_manager_->GetChannels().size() < 2) { - return Status::OK(); - } - auto spill_channel_ids_before_merge = spill_channel_manager_->GetChannels(); - auto cleanup_guard = ScopeGuard([&]() { - for (const auto& spill_channel_id : spill_channel_ids_before_merge) { - [[maybe_unused]] auto status = spill_channel_manager_->DeleteChannel(spill_channel_id); - } - }); +SpillFileMerger::MergeFn ExternalSortBuffer::CreateSpillFileMergeFn() { + return [this](const std::vector& files) -> Result { + return MergeAndReplaceFiles(files); + }; +} +Result ExternalSortBuffer::MergeAndReplaceFiles( + const std::vector& files) { PAIMON_ASSIGN_OR_RAISE(std::vector> readers, - CollectSpillReaders()); - PAIMON_ASSIGN_OR_RAISE(int64_t merged_file_size, - SpillToDisk(std::move(readers), options_.GetWriteBatchSize())); - total_spill_disk_bytes_ = merged_file_size; + CreateSpillReaders(files)); + PAIMON_ASSIGN_OR_RAISE(FileChannelInfo output, + SpillToDisk(std::move(readers), spill_batch_size_)); + total_spill_disk_bytes_ += output.file_size; - return Status::OK(); + for (const auto& file : files) { + [[maybe_unused]] auto status = spill_channel_manager_->DeleteChannel(file.channel_id); + total_spill_disk_bytes_ -= file.file_size; + } + return output; } } // namespace paimon diff --git a/src/paimon/core/mergetree/external_sort_buffer.h b/src/paimon/core/mergetree/external_sort_buffer.h index 57b14d04d..4cc4fb505 100644 --- a/src/paimon/core/mergetree/external_sort_buffer.h +++ b/src/paimon/core/mergetree/external_sort_buffer.h @@ -26,6 +26,7 @@ #include "paimon/core/disk/file_io_channel.h" #include "paimon/core/mergetree/in_memory_sort_buffer.h" #include "paimon/core/mergetree/sort_buffer.h" +#include "paimon/core/mergetree/spill_file_merger.h" #include "paimon/record_batch.h" #include "paimon/result.h" #include "paimon/status.h" @@ -52,7 +53,7 @@ class ExternalSortBuffer : public SortBuffer { const std::shared_ptr& key_comparator, const std::shared_ptr& user_defined_seq_comparator, const CoreOptions& options, const std::shared_ptr& io_manager, - const std::shared_ptr& pool); + bool enable_multi_thread_spill, const std::shared_ptr& pool); ~ExternalSortBuffer() override; void Clear() override; @@ -63,14 +64,19 @@ class ExternalSortBuffer : public SortBuffer { bool HasData() const override; private: + static constexpr int32_t kSpillMinFanIn = 2; + static constexpr int32_t kSpillMinBatchSize = 256; + void DoClear(); + void UpdateSpillParameters(); bool HasSpilledData() const; - Result>> CollectSpillReaders() const; - Result SpillToDisk(std::vector>&& readers, - int32_t write_batch_size); - Status MergeSpilledFiles(); + Result>> CreateSpillReaders( + const std::vector& files) const; + Result SpillToDisk( + std::vector>&& readers, int32_t write_batch_size); + SpillFileMerger::MergeFn CreateSpillFileMergeFn(); + Result MergeAndReplaceFiles(const std::vector& files); Status SpillMemoryBuffer(std::vector>&& readers); - void CleanupSpillFiles(); ExternalSortBuffer(std::unique_ptr&& in_memory_buffer, const std::shared_ptr& key_schema, @@ -79,7 +85,7 @@ class ExternalSortBuffer : public SortBuffer { const std::shared_ptr& user_defined_seq_comparator, const CoreOptions& options, const std::shared_ptr& spill_channel_enumerator, - const std::shared_ptr& pool); + bool enable_multi_thread_spill, const std::shared_ptr& pool); std::unique_ptr in_memory_buffer_; @@ -90,10 +96,15 @@ class ExternalSortBuffer : public SortBuffer { const std::shared_ptr user_defined_seq_comparator_; const std::shared_ptr write_schema_; const CoreOptions options_; + const int32_t max_fan_in_; + const bool enable_multi_thread_spill_; const std::shared_ptr spill_channel_manager_; + std::unique_ptr spill_merger_; std::shared_ptr spill_channel_enumerator_; int64_t total_spill_disk_bytes_ = 0; + int32_t actual_max_fan_in_; + int32_t spill_batch_size_; }; } // namespace paimon diff --git a/src/paimon/core/mergetree/in_memory_sort_buffer.cpp b/src/paimon/core/mergetree/in_memory_sort_buffer.cpp index 4bedddf92..b428ef282 100644 --- a/src/paimon/core/mergetree/in_memory_sort_buffer.cpp +++ b/src/paimon/core/mergetree/in_memory_sort_buffer.cpp @@ -53,6 +53,7 @@ InMemorySortBuffer::InMemorySortBuffer(int64_t last_sequence_number, void InMemorySortBuffer::Clear() { buffered_batches_.clear(); current_memory_in_bytes_ = 0; + total_row_count_ = 0; } uint64_t InMemorySortBuffer::GetMemorySize() const { @@ -82,7 +83,11 @@ Result InMemorySortBuffer::Write(std::unique_ptr&& moved_batc buffered_batch.struct_array = std::move(value_struct_array); buffered_batch.row_kinds = batch->GetRowKind(); next_sequence_number_ += buffered_batch.struct_array->length(); + total_row_count_ += buffered_batch.struct_array->length(); buffered_batches_.push_back(std::move(buffered_batch)); + if (total_row_count_ > 0) { + estimated_memory_use_for_each_row_ = current_memory_in_bytes_ / total_row_count_; + } return current_memory_in_bytes_ < write_buffer_size_; } @@ -107,6 +112,10 @@ bool InMemorySortBuffer::HasData() const { return !buffered_batches_.empty(); } +uint64_t InMemorySortBuffer::GetEstimateMemoryUseForEachRow() const { + return estimated_memory_use_for_each_row_; +} + // TODO(jinli.zjw): Consider making the memory estimation more accurate. // https://github.com/alibaba/paimon-cpp/pull/206#discussion_r3021325389 Result InMemorySortBuffer::EstimateMemoryUse(const std::shared_ptr& array) { diff --git a/src/paimon/core/mergetree/in_memory_sort_buffer.h b/src/paimon/core/mergetree/in_memory_sort_buffer.h index 63a000d9c..976e793b3 100644 --- a/src/paimon/core/mergetree/in_memory_sort_buffer.h +++ b/src/paimon/core/mergetree/in_memory_sort_buffer.h @@ -63,11 +63,13 @@ class InMemorySortBuffer : public SortBuffer { Result Write(std::unique_ptr&& batch) override; Result>> CreateReaders() override; bool HasData() const override; + /// Get the estimated average memory usage per row in bytes. + uint64_t GetEstimateMemoryUseForEachRow() const; + private: /// Estimate memory usage of an Arrow array. static Result EstimateMemoryUse(const std::shared_ptr& array); - private: const std::shared_ptr pool_; const std::shared_ptr value_type_; const std::vector trimmed_primary_keys_; @@ -78,6 +80,8 @@ class InMemorySortBuffer : public SortBuffer { std::vector buffered_batches_; uint64_t current_memory_in_bytes_ = 0; + uint64_t estimated_memory_use_for_each_row_ = 0; + int64_t total_row_count_ = 0; int64_t next_sequence_number_ = 0; }; diff --git a/src/paimon/core/mergetree/lookup/remote_lookup_file_manager_test.cpp b/src/paimon/core/mergetree/lookup/remote_lookup_file_manager_test.cpp index 5fea0304d..75bb98a3a 100644 --- a/src/paimon/core/mergetree/lookup/remote_lookup_file_manager_test.cpp +++ b/src/paimon/core/mergetree/lookup/remote_lookup_file_manager_test.cpp @@ -77,12 +77,12 @@ class RemoteLookupFileManagerTest : public testing::Test { std::make_shared(std::move(mfunc)); PAIMON_ASSIGN_OR_RAISE( - auto writer, - MergeTreeWriter::Create(/*last_sequence_number=*/last_sequence_number, - std::vector({"key"}), data_path_factory, - key_comparator, /*user_defined_seq_comparator=*/nullptr, - merge_function_wrapper, /*schema_id=*/0, arrow_schema_, options, - noop_compact_manager_, /*io_manager=*/nullptr, pool_)); + auto writer, MergeTreeWriter::Create( + /*last_sequence_number=*/last_sequence_number, + std::vector({"key"}), data_path_factory, key_comparator, + /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper, + /*schema_id=*/0, arrow_schema_, options, noop_compact_manager_, + /*io_manager=*/nullptr, /*enable_multi_thread_spill=*/false, pool_)); ArrowArray c_src_array; PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*src_array, &c_src_array)); diff --git a/src/paimon/core/mergetree/lookup_levels_test.cpp b/src/paimon/core/mergetree/lookup_levels_test.cpp index c91fd44ab..f6dbc2f5e 100644 --- a/src/paimon/core/mergetree/lookup_levels_test.cpp +++ b/src/paimon/core/mergetree/lookup_levels_test.cpp @@ -81,12 +81,12 @@ class LookupLevelsTest : public testing::Test { std::make_shared(std::move(mfunc)); PAIMON_ASSIGN_OR_RAISE( - auto writer, - MergeTreeWriter::Create(/*last_sequence_number=*/last_sequence_number, - std::vector({"key"}), data_path_factory, - key_comparator, /*user_defined_seq_comparator=*/nullptr, - merge_function_wrapper, /*schema_id=*/0, arrow_schema_, options, - noop_compact_manager_, /*io_manager=*/nullptr, pool_)); + auto writer, MergeTreeWriter::Create( + /*last_sequence_number=*/last_sequence_number, + std::vector({"key"}), data_path_factory, key_comparator, + /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper, + /*schema_id=*/0, arrow_schema_, options, noop_compact_manager_, + /*io_manager=*/nullptr, /*enable_multi_thread_spill=*/false, pool_)); // write data ArrowArray c_src_array; diff --git a/src/paimon/core/mergetree/merge_tree_writer.cpp b/src/paimon/core/mergetree/merge_tree_writer.cpp index 1c55c7d38..fc928ad50 100644 --- a/src/paimon/core/mergetree/merge_tree_writer.cpp +++ b/src/paimon/core/mergetree/merge_tree_writer.cpp @@ -56,13 +56,15 @@ Result> MergeTreeWriter::Create( const std::shared_ptr>& merge_function_wrapper, int64_t schema_id, const std::shared_ptr& value_schema, const CoreOptions& options, const std::shared_ptr& compact_manager, - const std::shared_ptr& io_manager, const std::shared_ptr& pool) { + const std::shared_ptr& io_manager, bool enable_multi_thread_spill, + const std::shared_ptr& pool) { auto write_schema = SpecialFields::CompleteSequenceAndValueKindField(value_schema); PAIMON_ASSIGN_OR_RAISE( std::unique_ptr write_buffer, WriteBuffer::Create(last_sequence_number, value_schema, trimmed_primary_keys, options.GetSequenceField(), key_comparator, user_defined_seq_comparator, - merge_function_wrapper, options, io_manager, pool)); + merge_function_wrapper, options, io_manager, enable_multi_thread_spill, + pool)); return std::shared_ptr( new MergeTreeWriter(pool, trimmed_primary_keys, options, path_factory, key_comparator, user_defined_seq_comparator, merge_function_wrapper, schema_id, diff --git a/src/paimon/core/mergetree/merge_tree_writer.h b/src/paimon/core/mergetree/merge_tree_writer.h index 5c14c1886..6ebde0d64 100644 --- a/src/paimon/core/mergetree/merge_tree_writer.h +++ b/src/paimon/core/mergetree/merge_tree_writer.h @@ -62,7 +62,8 @@ class MergeTreeWriter : public BatchWriter { const std::shared_ptr>& merge_function_wrapper, int64_t schema_id, const std::shared_ptr& value_schema, const CoreOptions& options, const std::shared_ptr& compact_manager, - const std::shared_ptr& io_manager, const std::shared_ptr& pool); + const std::shared_ptr& io_manager, bool enable_multi_thread_spill, + const std::shared_ptr& pool); Status Write(std::unique_ptr&& batch) override; diff --git a/src/paimon/core/mergetree/merge_tree_writer_test.cpp b/src/paimon/core/mergetree/merge_tree_writer_test.cpp index 508de4c9e..17854c09c 100644 --- a/src/paimon/core/mergetree/merge_tree_writer_test.cpp +++ b/src/paimon/core/mergetree/merge_tree_writer_test.cpp @@ -176,10 +176,10 @@ class MergeTreeWriterTest : public ::testing::TestWithParam { compact_manager ? compact_manager : noop_compact_manager_; std::shared_ptr io_manager = GetParam() ? std::make_shared(temp_dir + "/tmp", file_system_) : nullptr; - return MergeTreeWriter::Create(last_sequence_number, primary_keys_, path_factory, - key_comparator_, user_defined_seq_comparator, - merge_function_wrapper_, schema_id, value_schema_, options, - writer_compact_manager, io_manager, pool_); + return MergeTreeWriter::Create( + last_sequence_number, primary_keys_, path_factory, key_comparator_, + user_defined_seq_comparator, merge_function_wrapper_, schema_id, value_schema_, options, + writer_compact_manager, io_manager, /*enable_multi_thread_spill=*/false, pool_); } private: @@ -1076,7 +1076,7 @@ TEST_P(MergeTreeWriterTest, TestCloseSkipsDeleteForUpgradedFilesInCompactAfter) << "Intermediate file should be deleted because it's not in compact_before_"; } -TEST_P(MergeTreeWriterTest, TestSpillWithSameKeyDeduplicate) { +TEST_F(MergeTreeWriterTest, TestSpillWithSameKeyDeduplicate) { ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}, {Options::WRITE_BUFFER_SIZE, "1"}, @@ -1094,7 +1094,8 @@ TEST_P(MergeTreeWriterTest, TestSpillWithSameKeyDeduplicate) { MergeTreeWriter::Create(/*last_sequence_number=*/-1, primary_keys_, path_factory, key_comparator_, /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper_, /*schema_id=*/0, value_schema_, options, - noop_compact_manager_, io_manager, pool_)); + noop_compact_manager_, io_manager, + /*enable_multi_thread_spill=*/false, pool_)); std::shared_ptr batch1 = arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ @@ -1111,7 +1112,9 @@ TEST_P(MergeTreeWriterTest, TestSpillWithSameKeyDeduplicate) { WriteBatch(batch1, /*row_kinds=*/{}, merge_writer.get()); WriteBatch(batch2, /*row_kinds=*/{}, merge_writer.get()); - ASSERT_EQ(2u, TestHelper::CountChannelFiles(file_system_, dir->Str() + "/tmp")); + // WRITE_BUFFER_SIZE=1 causes UpdateSpillParameters() to clamp actual_max_fan_in_ to 2, + // triggering leveled merge after 2 spill files are produced, merging them into 1. + ASSERT_EQ(1u, TestHelper::CountChannelFiles(file_system_, dir->Str() + "/tmp")); std::shared_ptr batch3 = arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ @@ -1140,7 +1143,7 @@ TEST_P(MergeTreeWriterTest, TestSpillWithSameKeyDeduplicate) { CheckFileContent(expected_data_file_path, expected_array); } -TEST_P(MergeTreeWriterTest, TestIntermediateMergeSpillFileBound) { +TEST_F(MergeTreeWriterTest, TestIntermediateMergeSpillFileBound) { ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}, {Options::WRITE_BUFFER_SIZE, "1"}, @@ -1159,7 +1162,8 @@ TEST_P(MergeTreeWriterTest, TestIntermediateMergeSpillFileBound) { MergeTreeWriter::Create(/*last_sequence_number=*/-1, primary_keys_, path_factory, key_comparator_, /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper_, /*schema_id=*/0, value_schema_, options, - noop_compact_manager_, io_manager, pool_)); + noop_compact_manager_, io_manager, + /*enable_multi_thread_spill=*/false, pool_)); std::shared_ptr batch1 = arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ @@ -1178,13 +1182,16 @@ TEST_P(MergeTreeWriterTest, TestIntermediateMergeSpillFileBound) { .ValueOrDie(); WriteBatch(batch1, /*row_kinds=*/{}, merge_writer.get()); + // Level 0: [A], total = 1 ASSERT_EQ(1u, TestHelper::CountChannelFiles(file_system_, dir->Str() + "/tmp")); WriteBatch(batch2, /*row_kinds=*/{}, merge_writer.get()); + // Level 0: [A,B] hits max_fan_in=2, merge -> Level 0: [], Level 1: [C], total = 1 ASSERT_EQ(1u, TestHelper::CountChannelFiles(file_system_, dir->Str() + "/tmp")); WriteBatch(batch3, /*row_kinds=*/{}, merge_writer.get()); - ASSERT_EQ(1u, TestHelper::CountChannelFiles(file_system_, dir->Str() + "/tmp")); + // Level 0: [D], Level 1: [C], total = 2 (no single level exceeds max_fan_in) + ASSERT_EQ(2u, TestHelper::CountChannelFiles(file_system_, dir->Str() + "/tmp")); ASSERT_OK_AND_ASSIGN(CommitIncrement commit_increment, merge_writer->PrepareCommit(/*wait_compaction=*/false)); @@ -1203,7 +1210,7 @@ TEST_P(MergeTreeWriterTest, TestIntermediateMergeSpillFileBound) { CheckFileContent(expected_data_file_path, expected_array); } -TEST_P(MergeTreeWriterTest, TestDiskQuotaExhaustedFallsBackToFlushWriteBuffer) { +TEST_F(MergeTreeWriterTest, TestDiskQuotaExhaustedFallsBackToFlushWriteBuffer) { ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}, {Options::WRITE_BUFFER_SIZE, "1"}, @@ -1221,7 +1228,8 @@ TEST_P(MergeTreeWriterTest, TestDiskQuotaExhaustedFallsBackToFlushWriteBuffer) { MergeTreeWriter::Create(/*last_sequence_number=*/-1, primary_keys_, path_factory, key_comparator_, /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper_, /*schema_id=*/0, value_schema_, options, - noop_compact_manager_, io_manager, pool_)); + noop_compact_manager_, io_manager, + /*enable_multi_thread_spill=*/false, pool_)); // Phase 1: Manual FlushMemory path — disk quota exhausted causes fallback. std::shared_ptr array1 = @@ -1277,7 +1285,7 @@ TEST_P(MergeTreeWriterTest, TestDiskQuotaExhaustedFallsBackToFlushWriteBuffer) { } } -TEST_P(MergeTreeWriterTest, TestFlushMemoryQuotaExhaustedFallsBackToFlushWriteBuffer) { +TEST_F(MergeTreeWriterTest, TestFlushMemoryQuotaExhaustedFallsBackToFlushWriteBuffer) { // WRITE_BUFFER_SIZE is large enough so WriteBatch does NOT auto-spill. // SPILL_MAX_DISK_SIZE is tiny so the first FlushMemory() exhausts the quota, // triggering the fallback path: FlushMemory() -> quota exhausted -> FlushWriteBuffer. @@ -1298,7 +1306,8 @@ TEST_P(MergeTreeWriterTest, TestFlushMemoryQuotaExhaustedFallsBackToFlushWriteBu MergeTreeWriter::Create(/*last_sequence_number=*/-1, primary_keys_, path_factory, key_comparator_, /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper_, /*schema_id=*/0, value_schema_, options, - noop_compact_manager_, io_manager, pool_)); + noop_compact_manager_, io_manager, + /*enable_multi_thread_spill=*/false, pool_)); std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ @@ -1324,7 +1333,7 @@ TEST_P(MergeTreeWriterTest, TestFlushMemoryQuotaExhaustedFallsBackToFlushWriteBu ASSERT_OK(merge_writer->Close()); } -TEST_P(MergeTreeWriterTest, TestCloseDeletesSpillTempFiles) { +TEST_F(MergeTreeWriterTest, TestCloseDeletesSpillTempFiles) { ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}, {Options::WRITE_BUFFER_SIZE, "1"}, @@ -1341,7 +1350,8 @@ TEST_P(MergeTreeWriterTest, TestCloseDeletesSpillTempFiles) { MergeTreeWriter::Create(/*last_sequence_number=*/-1, primary_keys_, path_factory, key_comparator_, /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper_, /*schema_id=*/0, value_schema_, options, - noop_compact_manager_, io_manager, pool_)); + noop_compact_manager_, io_manager, + /*enable_multi_thread_spill=*/false, pool_)); std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ @@ -1356,7 +1366,7 @@ TEST_P(MergeTreeWriterTest, TestCloseDeletesSpillTempFiles) { ASSERT_EQ(0u, TestHelper::CountChannelFiles(file_system_, dir->Str() + "/tmp")); } -TEST_P(MergeTreeWriterTest, TestMultiplePrepareCommitWithSpill) { +TEST_F(MergeTreeWriterTest, TestMultiplePrepareCommitWithSpill) { ASSERT_OK_AND_ASSIGN( CoreOptions options, CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}, {Options::WRITE_ONLY, "true"}})); @@ -1373,7 +1383,8 @@ TEST_P(MergeTreeWriterTest, TestMultiplePrepareCommitWithSpill) { MergeTreeWriter::Create(/*last_sequence_number=*/-1, primary_keys_, path_factory, key_comparator_, /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper_, /*schema_id=*/0, value_schema_, options, - noop_compact_manager_, io_manager, pool_)); + noop_compact_manager_, io_manager, + /*enable_multi_thread_spill=*/false, pool_)); std::shared_ptr array1 = arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ @@ -1428,6 +1439,92 @@ TEST_P(MergeTreeWriterTest, TestMultiplePrepareCommitWithSpill) { ASSERT_OK(merge_writer->Close()); } +TEST_F(MergeTreeWriterTest, TestSpillWithIOException) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, "orc"}, + {Options::WRITE_BUFFER_SIZE, "1"}, + {Options::WRITE_ONLY, "true"}, + {Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, "2"}})); + + bool run_complete = false; + auto io_hook = IOHook::GetInstance(); + for (size_t i = 0; i < 2000; i++) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), "orc", options.DataFilePrefix(), nullptr)); + + std::shared_ptr io_manager = + std::make_shared(dir->Str() + "/tmp", file_system_); + ASSERT_OK_AND_ASSIGN( + auto merge_writer, + MergeTreeWriter::Create(/*last_sequence_number=*/-1, primary_keys_, path_factory, + key_comparator_, /*user_defined_seq_comparator=*/nullptr, + merge_function_wrapper_, /*schema_id=*/0, value_schema_, + options, noop_compact_manager_, io_manager, + /*enable_multi_thread_spill=*/false, pool_)); + + ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); + io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); + // Write 4 batches, each with 2 rows sharing the same key to exercise deduplication. + // Batch 1: triggers spill file 1 + std::shared_ptr batch1 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 1, 0, 1.0], + ["Bob", 2, 0, 2.0] + ])") + .ValueOrDie(); + auto b1 = CreateBatch(batch1, {}); + CHECK_HOOK_STATUS(merge_writer->Write(std::move(b1)), i); + + // Batch 2: triggers spill file 2 → intermediate merge (merge 2 files into 1) + std::shared_ptr batch2 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Alice", 10, 0, 10.0], + ["Charlie", 3, 0, 3.0] + ])") + .ValueOrDie(); + auto b2 = CreateBatch(batch2, {}); + CHECK_HOOK_STATUS(merge_writer->Write(std::move(b2)), i); + + // Batch 3: triggers spill file at level 0 again + std::shared_ptr batch3 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Bob", 20, 0, 20.0], + ["Dave", 4, 0, 4.0] + ])") + .ValueOrDie(); + auto b3 = CreateBatch(batch3, {}); + CHECK_HOOK_STATUS(merge_writer->Write(std::move(b3)), i); + + // Batch 4: triggers spill file at level 0 → another merge at level 0, + // then level 1 has 2 files → merge at level 1 as well. + std::shared_ptr batch4 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Charlie", 30, 0, 30.0], + ["Eve", 5, 0, 5.0] + ])") + .ValueOrDie(); + auto b4 = CreateBatch(batch4, {}); + CHECK_HOOK_STATUS(merge_writer->Write(std::move(b4)), i); + + // PrepareCommit: triggers FlushWriteBuffer → CreateReaders (RunFinalCleanupIfNeeded) + // → sort merge → write output data file + auto commit_increment = merge_writer->PrepareCommit(/*wait_compaction=*/false); + CHECK_HOOK_STATUS(commit_increment.status(), i); + ASSERT_FALSE(commit_increment.value().GetNewFilesIncrement().NewFiles().empty()); + + // Verify deduplication: Alice(seq=2), Bob(seq=4), Charlie(seq=5), Dave(seq=6), Eve(seq=7) + ASSERT_EQ(1, commit_increment.value().GetNewFilesIncrement().NewFiles().size()); + ASSERT_EQ(5, commit_increment.value().GetNewFilesIncrement().NewFiles()[0]->row_count); + + ASSERT_OK(merge_writer->Close()); + run_complete = true; + break; + } + ASSERT_TRUE(run_complete); +} + INSTANTIATE_TEST_SUITE_P(WithOptionalIOManager, MergeTreeWriterTest, ::testing::Values(false, true)); diff --git a/src/paimon/core/mergetree/sort_buffer_test.cpp b/src/paimon/core/mergetree/sort_buffer_test.cpp index 1c60c2272..5098804f7 100644 --- a/src/paimon/core/mergetree/sort_buffer_test.cpp +++ b/src/paimon/core/mergetree/sort_buffer_test.cpp @@ -21,6 +21,7 @@ #include #include "arrow/api.h" +#include "arrow/c/bridge.h" #include "arrow/ipc/json_simple.h" #include "gtest/gtest.h" #include "paimon/common/types/data_field.h" @@ -139,7 +140,7 @@ class SortBufferTest : public ::testing::Test { /*sequence_fields_ascending=*/true, key_comparator_, write_buffer_size, pool_); return ExternalSortBuffer::Create(std::move(in_memory_buffer), value_schema_, primary_keys_, key_comparator_, sequence_comparator_, options, - io_manager_, pool_); + io_manager_, /*enable_multi_thread_spill=*/false, pool_); } void AssertRows(const std::vector& actual, @@ -280,6 +281,43 @@ TEST_F(SortBufferTest, TestInMemorySortBufferSimple) { ASSERT_EQ(buffer.GetMemorySize(), 0); } +TEST_F(SortBufferTest, TestInMemorySortBufferEstimateMemoryUseForEachRow) { + InMemorySortBuffer buffer(/*last_sequence_number=*/9, value_type_, primary_keys_, + sequence_fields_, /*sequence_fields_ascending=*/true, key_comparator_, + /*write_buffer_size=*/1024 * 1024, pool_); + + ASSERT_EQ(buffer.GetEstimateMemoryUseForEachRow(), 0); + + uint64_t cached_memory_use_per_row = 0; + for (int32_t index = 0; index < 3; ++index) { + std::vector input_rows; + input_rows.push_back(MakeRow(RowKind::Insert(), std::string(index + 1, 'a'), index, index)); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, buffer.Write(MakeBatch(input_rows))); + ASSERT_TRUE(has_remaining_quota); + + cached_memory_use_per_row = buffer.GetMemorySize() / (index + 1); + ASSERT_EQ(buffer.GetEstimateMemoryUseForEachRow(), cached_memory_use_per_row); + } + + // Clear does not reset the estimated per-row memory usage. + buffer.Clear(); + ASSERT_EQ(buffer.GetEstimateMemoryUseForEachRow(), cached_memory_use_per_row); + + // Verify behavior when writing an empty batch. + std::shared_ptr empty_array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([])").ValueOrDie(); + ::ArrowArray c_array; + ASSERT_TRUE(arrow::ExportArray(*empty_array, &c_array).ok()); + RecordBatchBuilder batch_builder(&c_array); + batch_builder.SetRowKinds({}); + ASSERT_OK_AND_ASSIGN(std::unique_ptr empty_batch, batch_builder.Finish()); + + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, buffer.Write(std::move(empty_batch))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_EQ(buffer.GetEstimateMemoryUseForEachRow(), cached_memory_use_per_row); +} + TEST_F(SortBufferTest, TestExternalSortBufferWithInMemoryDataAndNoSpill) { ASSERT_OK_AND_ASSIGN(auto buffer, CreateExternalSortBuffer(/*last_sequence_number=*/4, /*write_buffer_size=*/1024 * 1024)); diff --git a/src/paimon/core/mergetree/spill_file_merger.cpp b/src/paimon/core/mergetree/spill_file_merger.cpp new file mode 100644 index 000000000..2bc9ff6a3 --- /dev/null +++ b/src/paimon/core/mergetree/spill_file_merger.cpp @@ -0,0 +1,160 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/spill_file_merger.h" + +#include +#include + +namespace paimon { + +SpillFileMerger::SpillFileMerger(int32_t max_fan_in) : max_fan_in_(max_fan_in) { + assert(max_fan_in >= 2); +} + +void SpillFileMerger::SetMaxFanIn(int32_t max_fan_in) { + assert(max_fan_in >= 2); + max_fan_in_ = max_fan_in; +} + +void SpillFileMerger::Clear() { + levels_.clear(); +} + +void SpillFileMerger::AddFile(const FileChannelInfo& file_info) { + EnsureLevel(0); + levels_[0].push_back(file_info); +} + +Status SpillFileMerger::RunMergeIfNeeded(const MergeFn& merge_fn) { + while (NeedMerge()) { + auto task = PickMergeTask(); + PAIMON_ASSIGN_OR_RAISE(FileChannelInfo output, merge_fn(task.input_files)); + ApplyMergeResult(task, output); + } + return Status::OK(); +} + +Status SpillFileMerger::RunFinalMergeIfNeeded(int32_t target_file_count, const MergeFn& merge_fn) { + while (GetTotalFileCount() > target_file_count) { + auto task = PickFinalMergeBatch(target_file_count); + PAIMON_ASSIGN_OR_RAISE(FileChannelInfo output, merge_fn(task.input_files)); + ApplyMergeResult(task, output); + } + return Status::OK(); +} + +bool SpillFileMerger::NeedMerge() const { + for (const auto& level : levels_) { + if (static_cast(level.size()) >= max_fan_in_) { + return true; + } + } + return false; +} + +void SpillFileMerger::ApplyMergeResult(const MergeTask& task, const FileChannelInfo& output) { + for (const auto& file : task.input_files) { + RemoveFile(file.channel_id); + } + EnsureLevel(task.target_level); + levels_[task.target_level].push_back(output); +} + +SpillFileMerger::MergeTask SpillFileMerger::PickMergeTask() const { + for (int32_t i = 0; i < static_cast(levels_.size()); ++i) { + if (static_cast(levels_[i].size()) >= max_fan_in_) { + MergeTask task; + task.target_level = i + 1; + task.input_files.assign(levels_[i].begin(), levels_[i].begin() + max_fan_in_); + return task; + } + } + assert(false && "PickMergeTask called but no pending merge"); + return {}; +} + +SpillFileMerger::MergeTask SpillFileMerger::PickFinalMergeBatch(int32_t target_file_count) const { + int32_t total = GetTotalFileCount(); + assert(total > target_file_count); + + // Collect all files with their levels, sort by size ascending. + struct LeveledFile { + int32_t level; + FileChannelInfo entry; + }; + std::vector all_files; + for (int32_t level_idx = 0; level_idx < static_cast(levels_.size()); ++level_idx) { + for (const auto& file : levels_[level_idx]) { + all_files.push_back({level_idx, file}); + } + } + std::sort(all_files.begin(), all_files.end(), + [](const LeveledFile& lhs, const LeveledFile& rhs) { + return lhs.entry.file_size < rhs.entry.file_size; + }); + + // Merge `files_to_merge` (alias: n) files into 1 eliminates (n-1) files. + // Need to eliminate (total - target_file_count), so n = total - target_file_count + 1. + // Bounded by max_fan_in_ (max merge width per round). + int32_t files_to_merge = std::min(total - target_file_count + 1, max_fan_in_); + + MergeTask task; + int32_t max_level = 0; + for (int32_t i = 0; i < files_to_merge; ++i) { + max_level = std::max(max_level, all_files[i].level); + task.input_files.push_back(all_files[i].entry); + } + task.target_level = max_level + 1; + return task; +} + +std::vector SpillFileMerger::GetAllFiles() const { + std::vector result; + for (const auto& level : levels_) { + for (const auto& file : level) { + result.push_back(file); + } + } + return result; +} + +int32_t SpillFileMerger::GetTotalFileCount() const { + int32_t total = 0; + for (const auto& level : levels_) { + total += static_cast(level.size()); + } + return total; +} + +void SpillFileMerger::EnsureLevel(int32_t level) { + while (static_cast(levels_.size()) <= level) { + levels_.emplace_back(); + } +} + +void SpillFileMerger::RemoveFile(const FileIOChannel::ID& channel_id) { + for (auto& level : levels_) { + for (auto it = level.begin(); it != level.end(); ++it) { + if (it->channel_id == channel_id) { + level.erase(it); + return; + } + } + } +} + +} // namespace paimon diff --git a/src/paimon/core/mergetree/spill_file_merger.h b/src/paimon/core/mergetree/spill_file_merger.h new file mode 100644 index 000000000..8f020ef6b --- /dev/null +++ b/src/paimon/core/mergetree/spill_file_merger.h @@ -0,0 +1,81 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/core/disk/file_io_channel.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +/// Manages spill files in a leveled structure (similar to LSM tree) to minimize read/write +/// amplification during external sort merge operations. +/// +/// Files are organized into levels. Level 0 contains the original spill files. When a level +/// accumulates max_fan_in files, they are merged into a single file at the next level. Before the +/// final read, a greedy merge reduces the total file count to <= max_fan_in. +/// +/// Read/write amplification: O(log_K(N)) vs O(N/K) for naive sequential merge. +class SpillFileMerger { + public: + using MergeFn = std::function(const std::vector&)>; + + explicit SpillFileMerger(int32_t max_fan_in); + + /// Update the maximum fan-in (merge width). + void SetMaxFanIn(int32_t max_fan_in); + + /// Remove all files from all levels. + void Clear(); + + /// Add a new spill file to level 0. + void AddFile(const FileChannelInfo& file_info); + + /// Merge any single level that has accumulated >= max_fan_in files into one file at the next + /// level. Repeats until every level has fewer than max_fan_in files. + Status RunMergeIfNeeded(const MergeFn& merge_fn); + + /// Reduce the total file count across all levels to <= target_file_count by greedily merging + /// the smallest files first. Each round merges at most max_fan_in files. + Status RunFinalMergeIfNeeded(int32_t target_file_count, const MergeFn& merge_fn); + + /// Collect all files across all levels into a flat vector. + std::vector GetAllFiles() const; + + private: + struct MergeTask { + int32_t target_level; + std::vector input_files; + }; + + bool NeedMerge() const; + void ApplyMergeResult(const MergeTask& task, const FileChannelInfo& output); + MergeTask PickMergeTask() const; + MergeTask PickFinalMergeBatch(int32_t target_file_count) const; + int32_t GetTotalFileCount() const; + void EnsureLevel(int32_t level); + void RemoveFile(const FileIOChannel::ID& channel_id); + + int32_t max_fan_in_; + std::vector> levels_; +}; + +} // namespace paimon diff --git a/src/paimon/core/mergetree/spill_file_merger_test.cpp b/src/paimon/core/mergetree/spill_file_merger_test.cpp new file mode 100644 index 000000000..a3e12e3e8 --- /dev/null +++ b/src/paimon/core/mergetree/spill_file_merger_test.cpp @@ -0,0 +1,287 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/spill_file_merger.h" + +#include + +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class SpillFileMergerTest : public ::testing::Test { + protected: + FileChannelInfo MakeFile(int32_t id, int64_t size) { + return FileChannelInfo{FileIOChannel::ID(std::to_string(id)), size}; + } + + SpillFileMerger::MergeFn CreateMockMergeFn() { + return [this](const std::vector& inputs) -> Result { + merge_call_count_++; + int64_t total_size = 0; + for (const auto& file : inputs) { + total_size += file.file_size; + } + return MakeFile(next_file_id_++, total_size); + }; + } + + SpillFileMerger::MergeFn CreateFailingMergeFn() { + return [this](const std::vector&) -> Result { + merge_call_count_++; + return Status::IOError("simulated write failure"); + }; + } + + int32_t merge_call_count_ = 0; + int32_t next_file_id_ = 1000; +}; + +TEST_F(SpillFileMergerTest, NoMergeBelowFanIn) { + SpillFileMerger merger(4); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + merger.AddFile(MakeFile(3, 300)); + + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 0); + ASSERT_EQ(merger.GetAllFiles().size(), 3); +} + +TEST_F(SpillFileMergerTest, MergeTriggeredAtFanIn) { + SpillFileMerger merger(3); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + merger.AddFile(MakeFile(3, 300)); + + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 1); + + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files[0].file_size, 600); +} + +TEST_F(SpillFileMergerTest, MinimalFanInTwo) { + SpillFileMerger merger(2); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 1); + + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files[0].file_size, 300); +} + +TEST_F(SpillFileMergerTest, MultiLevelMerge) { + SpillFileMerger merger(2); + + // Adding 4 files with fan_in=2 should trigger multi-level merge: + // Add file 1,2 -> merge to level 1 (1 file at level 1) + // Add file 3,4 -> merge level 0 to level 1 (2 files at level 1) -> merge level 1 + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 100)); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 1); + + merger.AddFile(MakeFile(3, 100)); + merger.AddFile(MakeFile(4, 100)); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + // level 0 merge + level 1 merge + ASSERT_EQ(merge_call_count_, 3); + + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files[0].file_size, 400); +} + +TEST_F(SpillFileMergerTest, ManyFilesWithFanInTwo) { + SpillFileMerger merger(2); + + for (int32_t i = 0; i < 8; ++i) { + merger.AddFile(MakeFile(i, 100)); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + } + + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 1); + ASSERT_EQ(files[0].file_size, 800); +} + +TEST_F(SpillFileMergerTest, FinalCleanupReducesFileCount) { + SpillFileMerger merger(4); + + // Add 5 files (just above fan_in). Level 0 gets merged once, leaving: + // level 0: 1 file, level 1: 1 file + for (int32_t i = 0; i < 5; ++i) { + merger.AddFile(MakeFile(i, 100)); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + } + + auto files_before = merger.GetAllFiles(); + ASSERT_EQ(files_before.size(), 2); + + ASSERT_OK(merger.RunFinalMergeIfNeeded(1, CreateMockMergeFn())); + + auto files_after = merger.GetAllFiles(); + ASSERT_EQ(files_after.size(), 1); +} + +TEST_F(SpillFileMergerTest, FinalCleanupMergesSmallestFirst) { + SpillFileMerger merger(10); + + merger.AddFile(MakeFile(1, 1000)); + merger.AddFile(MakeFile(2, 10)); + merger.AddFile(MakeFile(3, 20)); + merger.AddFile(MakeFile(4, 500)); + + // target=2, need to eliminate 2 files, so merge 3 smallest into 1 + ASSERT_OK(merger.RunFinalMergeIfNeeded(2, CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 1); + + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 2); + + // The 3 smallest (10, 20, 500) should be merged into one 530-sized file, + // leaving the largest (1000) untouched. + std::vector sizes; + for (const auto& file : files) { + sizes.push_back(file.file_size); + } + std::sort(sizes.begin(), sizes.end()); + ASSERT_EQ(sizes[0], 530); + ASSERT_EQ(sizes[1], 1000); +} + +TEST_F(SpillFileMergerTest, FinalCleanupNoOpWhenAlreadyBelowTarget) { + SpillFileMerger merger(4); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + + ASSERT_OK(merger.RunFinalMergeIfNeeded(3, CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 0); + ASSERT_EQ(merger.GetAllFiles().size(), 2); +} + +TEST_F(SpillFileMergerTest, FinalCleanupConvergesToTarget) { + // Add many files without running merge (fan_in large enough) + SpillFileMerger merger(100); + for (int32_t i = 0; i < 20; ++i) { + merger.AddFile(MakeFile(i, (i + 1) * 10)); + } + ASSERT_EQ(merger.GetAllFiles().size(), 20); + + ASSERT_OK(merger.RunFinalMergeIfNeeded(3, CreateMockMergeFn())); + ASSERT_LE(static_cast(merger.GetAllFiles().size()), 3); +} + +TEST_F(SpillFileMergerTest, MergeFnFailurePreservesState) { + SpillFileMerger merger(2); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + + auto status = merger.RunMergeIfNeeded(CreateFailingMergeFn()); + ASSERT_FALSE(status.ok()); + ASSERT_EQ(merge_call_count_, 1); + + // Files should still be present since merge failed + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 2); +} + +TEST_F(SpillFileMergerTest, ClearRemovesAllFiles) { + SpillFileMerger merger(4); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + merger.AddFile(MakeFile(3, 300)); + + merger.Clear(); + ASSERT_EQ(merger.GetAllFiles().size(), 0); +} + +TEST_F(SpillFileMergerTest, SetMaxFanInAffectsMerge) { + SpillFileMerger merger(4); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + merger.AddFile(MakeFile(3, 300)); + + // No merge at fan_in=4 + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 0); + + // Lower fan_in to 3, now merge should trigger + merger.SetMaxFanIn(3); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 1); + ASSERT_EQ(merger.GetAllFiles().size(), 1); +} + +TEST_F(SpillFileMergerTest, SetMaxFanInToLargerValueSuppressesMerge) { + SpillFileMerger merger(3); + + merger.AddFile(MakeFile(1, 100)); + merger.AddFile(MakeFile(2, 200)); + merger.AddFile(MakeFile(3, 300)); + + // At fan_in=3, merge should trigger + // But first, increase fan_in to 5 before running merge + merger.SetMaxFanIn(5); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 0); + + // All 3 files should still be present + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 3); + + // Add more files up to 5, still no merge + merger.AddFile(MakeFile(4, 400)); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 0); + ASSERT_EQ(merger.GetAllFiles().size(), 4); + + // Add 5th file, now merge triggers + merger.AddFile(MakeFile(5, 500)); + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + ASSERT_EQ(merge_call_count_, 1); +} + +TEST_F(SpillFileMergerTest, MergeOnlyTakesFanInFilesFromLevel) { + SpillFileMerger merger(3); + + // Add 5 files to level 0 (exceeds fan_in=3) + for (int32_t i = 0; i < 5; ++i) { + merger.AddFile(MakeFile(i, 100)); + } + + ASSERT_OK(merger.RunMergeIfNeeded(CreateMockMergeFn())); + + // First merge takes 3 from level 0 -> 1 at level 1 + // Remaining: 2 at level 0, 1 at level 1 = 3 total + auto files = merger.GetAllFiles(); + ASSERT_EQ(files.size(), 3); +} + +} // namespace paimon::test diff --git a/src/paimon/core/mergetree/spill_reader.cpp b/src/paimon/core/mergetree/spill_reader.cpp index cb1eb987e..7b78782ee 100644 --- a/src/paimon/core/mergetree/spill_reader.cpp +++ b/src/paimon/core/mergetree/spill_reader.cpp @@ -28,20 +28,22 @@ namespace paimon { SpillReader::SpillReader(const std::shared_ptr& fs, const std::shared_ptr& key_schema, - const std::shared_ptr& value_schema, + const std::shared_ptr& value_schema, bool use_threads, const std::shared_ptr& pool) : fs_(fs), key_schema_(key_schema), value_schema_(value_schema), pool_(pool), arrow_pool_(GetArrowPool(pool)), + use_threads_(use_threads), metrics_(std::make_shared()) {} Result> SpillReader::Create( const std::shared_ptr& fs, const std::shared_ptr& key_schema, - const std::shared_ptr& value_schema, const std::shared_ptr& pool, - const FileIOChannel::ID& channel_id) { - std::unique_ptr reader(new SpillReader(fs, key_schema, value_schema, pool)); + const std::shared_ptr& value_schema, bool use_threads, + const FileIOChannel::ID& channel_id, const std::shared_ptr& pool) { + std::unique_ptr reader( + new SpillReader(fs, key_schema, value_schema, use_threads, pool)); PAIMON_RETURN_NOT_OK(reader->Open(channel_id)); return reader; } @@ -55,6 +57,7 @@ Status SpillReader::Open(const FileIOChannel::ID& channel_id) { std::make_shared(in_stream_, arrow_pool_, file_len); auto ipc_read_options = arrow::ipc::IpcReadOptions::Defaults(); ipc_read_options.memory_pool = arrow_pool_.get(); + ipc_read_options.use_threads = use_threads_; PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( arrow_reader_, arrow::ipc::RecordBatchFileReader::Open(arrow_input_stream_adapter_, ipc_read_options)); diff --git a/src/paimon/core/mergetree/spill_reader.h b/src/paimon/core/mergetree/spill_reader.h index 1d4e109b5..e132bc0ad 100644 --- a/src/paimon/core/mergetree/spill_reader.h +++ b/src/paimon/core/mergetree/spill_reader.h @@ -40,8 +40,8 @@ class SpillReader : public KeyValueRecordReader { public: static Result> Create( const std::shared_ptr& fs, const std::shared_ptr& key_schema, - const std::shared_ptr& value_schema, const std::shared_ptr& pool, - const FileIOChannel::ID& channel_id); + const std::shared_ptr& value_schema, bool use_threads, + const FileIOChannel::ID& channel_id, const std::shared_ptr& pool); SpillReader(const SpillReader&) = delete; SpillReader& operator=(const SpillReader&) = delete; @@ -64,7 +64,7 @@ class SpillReader : public KeyValueRecordReader { private: SpillReader(const std::shared_ptr& fs, const std::shared_ptr& key_schema, - const std::shared_ptr& value_schema, + const std::shared_ptr& value_schema, bool use_threads, const std::shared_ptr& pool); Status Open(const FileIOChannel::ID& channel_id); @@ -75,6 +75,7 @@ class SpillReader : public KeyValueRecordReader { std::shared_ptr value_schema_; std::shared_ptr pool_; std::shared_ptr arrow_pool_; + bool use_threads_; std::shared_ptr metrics_; std::shared_ptr in_stream_; diff --git a/src/paimon/core/mergetree/spill_reader_writer_test.cpp b/src/paimon/core/mergetree/spill_reader_writer_test.cpp index 189205491..2587a1479 100644 --- a/src/paimon/core/mergetree/spill_reader_writer_test.cpp +++ b/src/paimon/core/mergetree/spill_reader_writer_test.cpp @@ -66,7 +66,7 @@ class SpillReaderWriterTest : public ::testing::TestWithParam { Result> CreateSpillWriter() const { return SpillWriter::Create(file_system_, write_schema_, channel_enumerator_, spill_channel_manager_, GetParam(), /*compression_level=*/1, - write_pool_); + /*use_threads=*/false, write_pool_); } FileIOChannel::ID WriteSpillFile( @@ -81,8 +81,8 @@ class SpillReaderWriterTest : public ::testing::TestWithParam { Result> CreateSpillReader( const FileIOChannel::ID& channel_id) const { - return SpillReader::Create(file_system_, key_schema_, value_schema_, read_pool_, - channel_id); + return SpillReader::Create(file_system_, key_schema_, value_schema_, /*use_threads=*/false, + channel_id, read_pool_); } protected: @@ -288,7 +288,7 @@ TEST_P(SpillReaderWriterTest, TestReaderSchemaMismatchErrors) { auto wrong_key_schema = arrow::schema({arrow::field("nonexistent_key", arrow::utf8())}); ASSERT_OK_AND_ASSIGN(auto reader, SpillReader::Create(file_system_, wrong_key_schema, value_schema_, - read_pool_, channel_id)); + /*use_threads=*/false, channel_id, read_pool_)); ASSERT_NOK_WITH_MSG(reader->NextBatch(), "cannot find key field nonexistent_key in spill file"); } @@ -297,7 +297,7 @@ TEST_P(SpillReaderWriterTest, TestReaderSchemaMismatchErrors) { {arrow::field("f0", arrow::utf8()), arrow::field("nonexistent_value", arrow::int32())}); ASSERT_OK_AND_ASSIGN(auto reader, SpillReader::Create(file_system_, key_schema_, wrong_value_schema, - read_pool_, channel_id)); + /*use_threads=*/false, channel_id, read_pool_)); ASSERT_NOK_WITH_MSG(reader->NextBatch(), "cannot find value field nonexistent_value in spill file"); } diff --git a/src/paimon/core/mergetree/spill_writer.cpp b/src/paimon/core/mergetree/spill_writer.cpp index a0677801a..9948b038e 100644 --- a/src/paimon/core/mergetree/spill_writer.cpp +++ b/src/paimon/core/mergetree/spill_writer.cpp @@ -30,24 +30,25 @@ SpillWriter::SpillWriter(const std::shared_ptr& fs, const std::shared_ptr& channel_enumerator, const std::shared_ptr& spill_channel_manager, const std::string& compression, int32_t compression_level, - const std::shared_ptr& pool) + bool use_threads, const std::shared_ptr& pool) : fs_(fs), schema_(schema), channel_enumerator_(channel_enumerator), spill_channel_manager_(spill_channel_manager), compression_(compression), compression_level_(compression_level), + use_threads_(use_threads), arrow_pool_(GetArrowPool(pool)) {} Result> SpillWriter::Create( const std::shared_ptr& fs, const std::shared_ptr& schema, const std::shared_ptr& channel_enumerator, const std::shared_ptr& spill_channel_manager, - const std::string& compression, int32_t compression_level, + const std::string& compression, int32_t compression_level, bool use_threads, const std::shared_ptr& pool) { std::unique_ptr writer(new SpillWriter(fs, schema, channel_enumerator, spill_channel_manager, compression, - compression_level, pool)); + compression_level, use_threads, pool)); PAIMON_RETURN_NOT_OK(writer->Open()); return writer; } @@ -56,6 +57,7 @@ Status SpillWriter::Open() { channel_id_ = channel_enumerator_->Next(); auto ipc_write_options = arrow::ipc::IpcWriteOptions::Defaults(); ipc_write_options.memory_pool = arrow_pool_.get(); + ipc_write_options.use_threads = use_threads_; auto cleanup_guard = ScopeGuard([&]() { arrow_writer_.reset(); arrow_output_stream_adapter_.reset(); diff --git a/src/paimon/core/mergetree/spill_writer.h b/src/paimon/core/mergetree/spill_writer.h index a0e55d537..abe29ae8b 100644 --- a/src/paimon/core/mergetree/spill_writer.h +++ b/src/paimon/core/mergetree/spill_writer.h @@ -42,7 +42,7 @@ class SpillWriter { const std::shared_ptr& fs, const std::shared_ptr& schema, const std::shared_ptr& channel_enumerator, const std::shared_ptr& spill_channel_manager, - const std::string& compression, int32_t compression_level, + const std::string& compression, int32_t compression_level, bool use_threads, const std::shared_ptr& pool); SpillWriter(const SpillWriter&) = delete; @@ -57,7 +57,7 @@ class SpillWriter { SpillWriter(const std::shared_ptr& fs, const std::shared_ptr& schema, const std::shared_ptr& channel_enumerator, const std::shared_ptr& spill_channel_manager, - const std::string& compression, int32_t compression_level, + const std::string& compression, int32_t compression_level, bool use_threads, const std::shared_ptr& pool); Status Open(); @@ -68,6 +68,7 @@ class SpillWriter { std::shared_ptr spill_channel_manager_; std::string compression_; int32_t compression_level_; + bool use_threads_; std::shared_ptr out_stream_; std::shared_ptr arrow_output_stream_adapter_; std::unique_ptr arrow_pool_; diff --git a/src/paimon/core/mergetree/write_buffer.cpp b/src/paimon/core/mergetree/write_buffer.cpp index c34898989..5c9f4d509 100644 --- a/src/paimon/core/mergetree/write_buffer.cpp +++ b/src/paimon/core/mergetree/write_buffer.cpp @@ -36,7 +36,7 @@ Result> WriteBuffer::Create( const std::shared_ptr& user_defined_seq_comparator, const std::shared_ptr>& merge_function_wrapper, const CoreOptions& options, const std::shared_ptr& io_manager, - const std::shared_ptr& pool) { + bool enable_multi_thread_spill, const std::shared_ptr& pool) { auto value_type = arrow::struct_(value_schema->fields()); auto in_memory_buffer = std::make_unique( last_sequence_number, value_type, trimmed_primary_keys, user_defined_sequence_fields, @@ -47,10 +47,10 @@ Result> WriteBuffer::Create( sort_buffer = std::move(in_memory_buffer); } else { PAIMON_ASSIGN_OR_RAISE( - sort_buffer, - ExternalSortBuffer::Create(std::move(in_memory_buffer), value_schema, - trimmed_primary_keys, key_comparator, - user_defined_seq_comparator, options, io_manager, pool)); + sort_buffer, ExternalSortBuffer::Create(std::move(in_memory_buffer), value_schema, + trimmed_primary_keys, key_comparator, + user_defined_seq_comparator, options, + io_manager, enable_multi_thread_spill, pool)); } return std::unique_ptr( new WriteBuffer(std::move(sort_buffer), key_comparator, merge_function_wrapper)); diff --git a/src/paimon/core/mergetree/write_buffer.h b/src/paimon/core/mergetree/write_buffer.h index 7c66dff95..ff0369ad1 100644 --- a/src/paimon/core/mergetree/write_buffer.h +++ b/src/paimon/core/mergetree/write_buffer.h @@ -58,7 +58,7 @@ class WriteBuffer { const std::shared_ptr& user_defined_seq_comparator, const std::shared_ptr>& merge_function_wrapper, const CoreOptions& options, const std::shared_ptr& io_manager, - const std::shared_ptr& pool); + bool enable_multi_thread_spill, const std::shared_ptr& pool); /// Import a RecordBatch into the buffer. /// Return false when the batch was accepted but the caller should fall back to diff --git a/src/paimon/core/mergetree/write_buffer_test.cpp b/src/paimon/core/mergetree/write_buffer_test.cpp index 90c35727d..d496dc95f 100644 --- a/src/paimon/core/mergetree/write_buffer_test.cpp +++ b/src/paimon/core/mergetree/write_buffer_test.cpp @@ -78,7 +78,7 @@ class WriteBufferTest : public ::testing::Test { WriteBuffer::Create(last_sequence_number, value_schema_, primary_keys_, /*user_defined_sequence_fields=*/{}, key_comparator_, /*user_defined_seq_comparator=*/nullptr, merge_function_wrapper_, - options, io_manager_, pool_)); + options, io_manager_, /*enable_multi_thread_spill=*/false, pool_)); return write_buffer; } @@ -147,12 +147,12 @@ TEST_F(WriteBufferTest, TestFlushResetsStateAndAdvancesSequenceNumber) { ])") .ValueOrDie(); - ASSERT_OK_AND_ASSIGN(bool buffered1, + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, write_buffer->Write(CreateBatch(array1, /*row_kinds=*/{}))); - ASSERT_TRUE(buffered1); - ASSERT_OK_AND_ASSIGN(bool buffered2, + ASSERT_TRUE(has_remaining_quota); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); - ASSERT_TRUE(buffered2); + ASSERT_TRUE(has_remaining_quota); ASSERT_FALSE(write_buffer->IsEmpty()); ASSERT_GT(write_buffer->GetMemoryUsage(), 0); @@ -194,8 +194,9 @@ TEST_F(WriteBufferTest, TestFlushPreservesRowKinds) { RecordBatch::RowKind::DELETE, }; - ASSERT_OK_AND_ASSIGN(bool buffered, write_buffer->Write(CreateBatch(array, row_kinds))); - ASSERT_TRUE(buffered); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array, row_kinds))); + ASSERT_TRUE(has_remaining_quota); ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); ASSERT_EQ(readers.size(), 1); @@ -221,8 +222,9 @@ TEST_F(WriteBufferTest, TestWriteRequestsFlushWriteBufferWhenSpillDisabled) { ])") .ValueOrDie(); - ASSERT_OK_AND_ASSIGN(bool buffered, write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); - ASSERT_FALSE(buffered); + ASSERT_OK_AND_ASSIGN(bool has_remaining_memory, + write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_FALSE(has_remaining_memory); ASSERT_FALSE(write_buffer->IsEmpty()); ASSERT_GT(write_buffer->GetMemoryUsage(), 0); } @@ -238,9 +240,7 @@ TEST_F(WriteBufferTest, TestSpillDiskQuotaEnforcement) { CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "1"}, {Options::WRITE_BUFFER_SPILLABLE, "true"}})); auto ref_write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, ref_options); - ASSERT_OK_AND_ASSIGN(bool ref_ok, - ref_write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); - ASSERT_TRUE(ref_ok); + ASSERT_OK(ref_write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); ASSERT_OK_AND_ASSIGN(int64_t spill_file_size, GetOnlySpillFileSize()); ref_write_buffer->Clear(); @@ -252,11 +252,11 @@ TEST_F(WriteBufferTest, TestSpillDiskQuotaEnforcement) { std::to_string(spill_file_size)}})); auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); - ASSERT_OK_AND_ASSIGN(bool has_quota, + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); - ASSERT_TRUE(has_quota); - ASSERT_OK_AND_ASSIGN(has_quota, write_buffer->FlushMemory()); - ASSERT_FALSE(has_quota); + ASSERT_TRUE(has_remaining_quota); + ASSERT_OK_AND_ASSIGN(bool has_remaining_disk, write_buffer->FlushMemory()); + ASSERT_FALSE(has_remaining_disk); // write_buffer is not empty because spilled data on disk still belongs to the buffer. ASSERT_FALSE(write_buffer->IsEmpty()); ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); @@ -271,9 +271,9 @@ TEST_F(WriteBufferTest, TestSpillDiskQuotaEnforcement) { std::to_string(spill_file_size)}})); auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); - ASSERT_OK_AND_ASSIGN(bool has_quota, + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); - ASSERT_FALSE(has_quota); + ASSERT_FALSE(has_remaining_quota); // write_buffer is not empty because spilled data on disk still belongs to the buffer. ASSERT_FALSE(write_buffer->IsEmpty()); ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); @@ -295,15 +295,23 @@ TEST_F(WriteBufferTest, TestSpillDiskQuotaEnforcement) { ])") .ValueOrDie(); - // Write 1: under quota → true. - ASSERT_OK_AND_ASSIGN(bool result1, + // Write 1: under disk quota → true. + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); - ASSERT_TRUE(result1); + ASSERT_TRUE(has_remaining_quota); - // Write 2: quota exhausted → false. - ASSERT_OK_AND_ASSIGN(bool result2, + // Write 2: WRITE_BUFFER_SIZE=1 causes UpdateSpillParameters() to clamp actual_max_fan_in_ + // to 2, triggering intermediate merge which reduces total_spill_disk_bytes_ below disk + // quota. So quota is NOT exhausted here. + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); - ASSERT_FALSE(result2); + ASSERT_TRUE(has_remaining_quota); + + // Write 3: spill adds a new file to level 0, but no merge is triggered (each level has + // fewer than fan_in files), so total disk usage exceeds quota → returns false. + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); + ASSERT_FALSE(has_remaining_quota); ASSERT_FALSE(write_buffer->IsEmpty()); ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); @@ -314,7 +322,7 @@ TEST_F(WriteBufferTest, TestSpillDiskQuotaEnforcement) { result.sequence_numbers.end()); } std::sort(all_sequence_numbers.begin(), all_sequence_numbers.end()); - ASSERT_EQ(all_sequence_numbers, (std::vector{0, 1})); + ASSERT_EQ(all_sequence_numbers, (std::vector{0, 2})); } } @@ -330,8 +338,9 @@ TEST_F(WriteBufferTest, TestCreateReadersMergesSingleInMemoryReaderLocally) { ])") .ValueOrDie(); - ASSERT_OK_AND_ASSIGN(bool buffered, write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); - ASSERT_TRUE(buffered); + ASSERT_OK_AND_ASSIGN(bool has_remaining_memory, + write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_memory); ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); ASSERT_EQ(readers.size(), 1); @@ -362,15 +371,15 @@ TEST_F(WriteBufferTest, TestCreateReadersReturnsBothSpillAndMemoryReaders) { ])") .ValueOrDie(); - ASSERT_OK_AND_ASSIGN(bool buffered1, + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, write_buffer->Write(CreateBatch(spill_array, /*row_kinds=*/{}))); - ASSERT_TRUE(buffered1); - ASSERT_OK_AND_ASSIGN(bool flush_result, write_buffer->FlushMemory()); - ASSERT_TRUE(flush_result); + ASSERT_TRUE(has_remaining_quota); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); - ASSERT_OK_AND_ASSIGN(bool buffered2, + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->Write(CreateBatch(memory_array, /*row_kinds=*/{}))); - ASSERT_TRUE(buffered2); + ASSERT_TRUE(has_remaining_quota); ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); ASSERT_EQ(readers.size(), 2); @@ -401,16 +410,18 @@ TEST_F(WriteBufferTest, TestCreateReadersReturnsBothSpillAndMemoryReaders) { ])") .ValueOrDie(); - ASSERT_OK_AND_ASSIGN(bool buf1, write_buffer->Write(CreateBatch(array1, /*row_kinds=*/{}))); - ASSERT_TRUE(buf1); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array1, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); - ASSERT_OK_AND_ASSIGN(bool flush_ok, write_buffer->FlushMemory()); - ASSERT_TRUE(flush_ok); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); ASSERT_FALSE(write_buffer->IsEmpty()); - ASSERT_OK_AND_ASSIGN(bool buf2, write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); - ASSERT_TRUE(buf2); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); ASSERT_GT(write_buffer->GetMemoryUsage(), 0); ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); @@ -457,12 +468,15 @@ TEST_F(WriteBufferTest, TestSpillReaderReturnsDataInSortedOrder) { ])") .ValueOrDie(); - ASSERT_OK_AND_ASSIGN(bool b1, write_buffer->Write(CreateBatch(array1, /*row_kinds=*/{}))); - ASSERT_TRUE(b1); - ASSERT_OK_AND_ASSIGN(bool b2, write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); - ASSERT_TRUE(b2); - ASSERT_OK_AND_ASSIGN(bool b3, write_buffer->Write(CreateBatch(array3, /*row_kinds=*/{}))); - ASSERT_TRUE(b3); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array1, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer->Write(CreateBatch(array3, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); @@ -489,11 +503,12 @@ TEST_F(WriteBufferTest, TestSpillReaderReturnsDataInSortedOrder) { ])") .ValueOrDie(); - ASSERT_OK_AND_ASSIGN(bool buf, write_buffer2->Write(CreateBatch(multi_row_array, {}))); - ASSERT_TRUE(buf); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer2->Write(CreateBatch(multi_row_array, {}))); + ASSERT_TRUE(has_remaining_quota); - ASSERT_OK_AND_ASSIGN(bool flush_ok, write_buffer2->FlushMemory()); - ASSERT_TRUE(flush_ok); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer2->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); ASSERT_OK_AND_ASSIGN(auto readers2, write_buffer2->CreateReaders()); ASSERT_EQ(readers2.size(), 1); @@ -512,8 +527,8 @@ TEST_F(WriteBufferTest, TestEmptyBufferBehavior) { ASSERT_TRUE(write_buffer->IsEmpty()); ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); - ASSERT_OK_AND_ASSIGN(bool flush_result, write_buffer->FlushMemory()); - ASSERT_TRUE(flush_result); + ASSERT_OK_AND_ASSIGN(bool has_remaining_disk, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_disk); ASSERT_TRUE(write_buffer->IsEmpty()); ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); @@ -524,11 +539,11 @@ TEST_F(WriteBufferTest, TestEmptyBufferBehavior) { } TEST_F(WriteBufferTest, TestMergeSpilledFilesSkipsWithSingleFile) { - // HANDLES=1: MergeSpilledFiles triggered but skips with only 1 file. + // HANDLES=2: with only 1 spill file, merge is not triggered. ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({{Options::WRITE_BUFFER_SIZE, "1"}, {Options::WRITE_BUFFER_SPILLABLE, "true"}, - {Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, "1"}})); + {Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, "2"}})); auto write_buffer = CreateWriteBuffer(/*last_sequence_number=*/-1, options); std::shared_ptr array = @@ -537,8 +552,9 @@ TEST_F(WriteBufferTest, TestMergeSpilledFilesSkipsWithSingleFile) { ])") .ValueOrDie(); - ASSERT_OK_AND_ASSIGN(bool buffered, write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); - ASSERT_TRUE(buffered); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); ASSERT_FALSE(write_buffer->IsEmpty()); ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); ASSERT_FALSE(readers.empty()); @@ -565,11 +581,12 @@ TEST_F(WriteBufferTest, TestMultipleFlushWriteCyclesWorkCorrectly) { ])") .ValueOrDie(); // Cycle 1: Write → Flush → Read → Clear. - ASSERT_OK_AND_ASSIGN(bool buf1, write_buffer->Write(CreateBatch(array1, /*row_kinds=*/{}))); - ASSERT_TRUE(buf1); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array1, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); - ASSERT_OK_AND_ASSIGN(bool flush1, write_buffer->FlushMemory()); - ASSERT_TRUE(flush1); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); ASSERT_OK_AND_ASSIGN(auto readers1, write_buffer->CreateReaders()); ASSERT_FALSE(readers1.empty()); @@ -591,11 +608,12 @@ TEST_F(WriteBufferTest, TestMultipleFlushWriteCyclesWorkCorrectly) { ["Charlie", 30, 2, 15.1] ])") .ValueOrDie(); - ASSERT_OK_AND_ASSIGN(bool buf2, write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); - ASSERT_TRUE(buf2); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, + write_buffer->Write(CreateBatch(array2, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); - ASSERT_OK_AND_ASSIGN(bool flush2, write_buffer->FlushMemory()); - ASSERT_TRUE(flush2); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); ASSERT_OK_AND_ASSIGN(auto readers2, write_buffer->CreateReaders()); ASSERT_FALSE(readers2.empty()); @@ -629,8 +647,9 @@ TEST_F(WriteBufferTest, TestMergeSpilledFilesDeduplicationAndRowKinds) { RecordBatch::RowKind::INSERT, RecordBatch::RowKind::UPDATE_AFTER, }; - ASSERT_OK_AND_ASSIGN(bool b1, write_buffer->Write(CreateBatch(array1, row_kinds1))); - ASSERT_TRUE(b1); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array1, row_kinds1))); + ASSERT_TRUE(has_remaining_quota); // Bob(DELETE), Alice(UPDATE_AFTER) — cross-file overlap on Alice. std::shared_ptr array2 = @@ -643,8 +662,8 @@ TEST_F(WriteBufferTest, TestMergeSpilledFilesDeduplicationAndRowKinds) { RecordBatch::RowKind::DELETE, RecordBatch::RowKind::UPDATE_AFTER, }; - ASSERT_OK_AND_ASSIGN(bool b2, write_buffer->Write(CreateBatch(array2, row_kinds2))); - ASSERT_TRUE(b2); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->Write(CreateBatch(array2, row_kinds2))); + ASSERT_TRUE(has_remaining_quota); // Diana(INSERT), Bob(INSERT) — overlap on Bob. std::shared_ptr array3 = @@ -657,8 +676,8 @@ TEST_F(WriteBufferTest, TestMergeSpilledFilesDeduplicationAndRowKinds) { RecordBatch::RowKind::INSERT, RecordBatch::RowKind::INSERT, }; - ASSERT_OK_AND_ASSIGN(bool b3, write_buffer->Write(CreateBatch(array3, row_kinds3))); - ASSERT_TRUE(b3); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->Write(CreateBatch(array3, row_kinds3))); + ASSERT_TRUE(has_remaining_quota); // After dedup: Alice keeps seq=3, Bob keeps seq=5, Charlie seq=1, Diana seq=4. ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); @@ -688,11 +707,12 @@ TEST_F(WriteBufferTest, TestSpillPreservesNullValues) { ])") .ValueOrDie(); - ASSERT_OK_AND_ASSIGN(bool buffered, write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); - ASSERT_TRUE(buffered); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); - ASSERT_OK_AND_ASSIGN(bool flush_ok, write_buffer->FlushMemory()); - ASSERT_TRUE(flush_ok); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); ASSERT_EQ(write_buffer->GetMemoryUsage(), 0); ASSERT_OK_AND_ASSIGN(auto readers, write_buffer->CreateReaders()); @@ -735,11 +755,12 @@ TEST_F(WriteBufferTest, TestDestructorCleansUpSpillFiles) { ])") .ValueOrDie(); - ASSERT_OK_AND_ASSIGN(bool buffered, write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); - ASSERT_TRUE(buffered); + ASSERT_OK_AND_ASSIGN(bool has_remaining_quota, + write_buffer->Write(CreateBatch(array, /*row_kinds=*/{}))); + ASSERT_TRUE(has_remaining_quota); - ASSERT_OK_AND_ASSIGN(bool flush_ok, write_buffer->FlushMemory()); - ASSERT_TRUE(flush_ok); + ASSERT_OK_AND_ASSIGN(has_remaining_quota, write_buffer->FlushMemory()); + ASSERT_TRUE(has_remaining_quota); ASSERT_EQ(TestHelper::CountChannelFiles(tmp_dir_->GetFileSystem(), tmp_dir_->Str()), 1); diff --git a/src/paimon/core/operation/file_store_write.cpp b/src/paimon/core/operation/file_store_write.cpp index d2b217377..ceb27b073 100644 --- a/src/paimon/core/operation/file_store_write.cpp +++ b/src/paimon/core/operation/file_store_write.cpp @@ -213,7 +213,7 @@ Result> FileStoreWrite::Create(std::unique_ptrGetRootPath(), schema, arrow_schema, partition_schema, dv_maintainer_factory, io_manager, key_comparator, sequence_fields_comparator, merge_function_wrapper, options, ignore_previous_files, ctx->IsStreamingMode(), ctx->IgnoreNumBucketCheck(), - ctx->GetExecutor(), ctx->GetMemoryPool()); + ctx->EnableMultiThreadSpill(), ctx->GetExecutor(), ctx->GetMemoryPool()); } } diff --git a/src/paimon/core/operation/key_value_file_store_write.cpp b/src/paimon/core/operation/key_value_file_store_write.cpp index c421790fb..16b5180b5 100644 --- a/src/paimon/core/operation/key_value_file_store_write.cpp +++ b/src/paimon/core/operation/key_value_file_store_write.cpp @@ -57,13 +57,14 @@ KeyValueFileStoreWrite::KeyValueFileStoreWrite( const std::shared_ptr& user_defined_seq_comparator, const std::shared_ptr>& merge_function_wrapper, const CoreOptions& options, bool ignore_previous_files, bool is_streaming_mode, - bool ignore_num_bucket_check, const std::shared_ptr& executor, - const std::shared_ptr& pool) + bool ignore_num_bucket_check, bool enable_multi_thread_spill, + const std::shared_ptr& executor, const std::shared_ptr& pool) : AbstractFileStoreWrite(file_store_path_factory, snapshot_manager, schema_manager, commit_user, root_path, table_schema, schema, /*write_schema=*/schema, partition_schema, dv_maintainer_factory, io_manager, options, ignore_previous_files, is_streaming_mode, ignore_num_bucket_check, executor, pool), + enable_multi_thread_spill_(enable_multi_thread_spill), key_comparator_(key_comparator), user_defined_seq_comparator_(user_defined_seq_comparator), merge_function_wrapper_(merge_function_wrapper), @@ -116,7 +117,7 @@ Result> KeyValueFileStoreWrite::CreateWriter( MergeTreeWriter::Create( restore_max_seq_number, trimmed_primary_keys, data_file_path_factory, key_comparator_, user_defined_seq_comparator_, merge_function_wrapper_, table_schema_->Id(), schema_, - options_, compact_manager, io_manager_, pool_)); + options_, compact_manager, io_manager_, enable_multi_thread_spill_, pool_)); return writer; } diff --git a/src/paimon/core/operation/key_value_file_store_write.h b/src/paimon/core/operation/key_value_file_store_write.h index 2c5463a4f..b744cc4b2 100644 --- a/src/paimon/core/operation/key_value_file_store_write.h +++ b/src/paimon/core/operation/key_value_file_store_write.h @@ -62,8 +62,8 @@ class KeyValueFileStoreWrite : public AbstractFileStoreWrite { const std::shared_ptr& user_defined_seq_comparator, const std::shared_ptr>& merge_function_wrapper, const CoreOptions& options, bool ignore_previous_files, bool is_streaming_mode, - bool ignore_num_bucket_check, const std::shared_ptr& executor, - const std::shared_ptr& pool); + bool ignore_num_bucket_check, bool enable_multi_thread_spill, + const std::shared_ptr& executor, const std::shared_ptr& pool); Status Close() override; @@ -78,6 +78,7 @@ class KeyValueFileStoreWrite : public AbstractFileStoreWrite { const std::shared_ptr& filter) const override; private: + bool enable_multi_thread_spill_; std::shared_ptr key_comparator_; std::shared_ptr user_defined_seq_comparator_; std::shared_ptr> merge_function_wrapper_; diff --git a/src/paimon/core/operation/read_context_test.cpp b/src/paimon/core/operation/read_context_test.cpp index cf1c94cf9..0d11051a5 100644 --- a/src/paimon/core/operation/read_context_test.cpp +++ b/src/paimon/core/operation/read_context_test.cpp @@ -20,13 +20,15 @@ #include "gtest/gtest.h" #include "paimon/defs.h" +#include "paimon/executor.h" +#include "paimon/memory/memory_pool.h" #include "paimon/predicate/predicate_builder.h" #include "paimon/status.h" #include "paimon/testing/mock/mock_file_system.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { -TEST(ReadContextTest, TestSimple) { +TEST(ReadContextTest, TestDefaultValue) { ReadContextBuilder builder("table_root_path"); ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish()); ASSERT_EQ(ctx->GetPath(), "table_root_path"); @@ -50,6 +52,11 @@ TEST(ReadContextTest, TestSimple) { TEST(ReadContextTest, TestSetContent) { ReadContextBuilder builder("table_root_path"); + std::shared_ptr memory_pool = GetDefaultPool(); + std::shared_ptr executor = CreateDefaultExecutor(); + CacheConfig cache_config(/*buffer_size_limit=*/1024, /*range_size_limit=*/512, + /*hole_size_limit=*/128, /*pre_buffer_limit=*/2048); + builder.AddOption("key", "value"); builder.SetReadSchema({"f1", "f2"}); builder.SetReadFieldIds({0, 1}); @@ -63,7 +70,11 @@ TEST(ReadContextTest, TestSetContent) { builder.SetPrefetchMaxParallelNum(6); builder.EnableMultiThreadRowToBatch(true); builder.SetRowToBatchThreadNumber(9); + builder.WithMemoryPool(memory_pool); + builder.WithExecutor(executor); + builder.SetTableSchema("table-schema-json"); builder.WithBranch("rt"); + builder.WithCacheConfig(cache_config); builder.WithFileSystemSchemeToIdentifierMap({{"file", "local"}}); auto fs = std::make_shared(); builder.WithFileSystem(fs); @@ -83,7 +94,15 @@ TEST(ReadContextTest, TestSetContent) { ASSERT_EQ(6, ctx->GetPrefetchMaxParallelNum()); ASSERT_TRUE(ctx->EnableMultiThreadRowToBatch()); ASSERT_EQ(9, ctx->GetRowToBatchThreadNumber()); + ASSERT_EQ(memory_pool, ctx->GetMemoryPool()); + ASSERT_EQ(executor, ctx->GetExecutor()); + ASSERT_TRUE(ctx->GetSpecificTableSchema().has_value()); + ASSERT_EQ("table-schema-json", ctx->GetSpecificTableSchema().value()); ASSERT_EQ("rt", ctx->GetBranch()); + ASSERT_EQ(1024U, ctx->GetCacheConfig().GetBufferSizeLimit()); + ASSERT_EQ(512U, ctx->GetCacheConfig().GetRangeSizeLimit()); + ASSERT_EQ(128U, ctx->GetCacheConfig().GetHoleSizeLimit()); + ASSERT_EQ(2048U, ctx->GetCacheConfig().GetPreBufferLimit()); std::map expected_fs_map = {{"file", "local"}}; ASSERT_EQ(expected_fs_map, ctx->GetFileSystemSchemeToIdentifierMap()); std::map expected_options = {{"key", "value"}}; @@ -91,4 +110,15 @@ TEST(ReadContextTest, TestSetContent) { ASSERT_EQ(ctx->GetSpecificFileSystem(), fs); } +TEST(ReadContextTest, TestSetOptionsOverridesAddedOptions) { + ReadContextBuilder builder("table_root_path"); + builder.AddOption("old", "value"); + builder.SetOptions({{"key1", "value1"}, {"key2", "value2"}}); + + ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish()); + + std::map expected_options = {{"key1", "value1"}, {"key2", "value2"}}; + ASSERT_EQ(expected_options, ctx->GetOptions()); +} + } // namespace paimon::test diff --git a/src/paimon/core/operation/scan_context_test.cpp b/src/paimon/core/operation/scan_context_test.cpp index ee0f77153..40a3da004 100644 --- a/src/paimon/core/operation/scan_context_test.cpp +++ b/src/paimon/core/operation/scan_context_test.cpp @@ -18,14 +18,16 @@ #include "gtest/gtest.h" #include "paimon/defs.h" +#include "paimon/executor.h" #include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/memory/memory_pool.h" #include "paimon/predicate/predicate_builder.h" #include "paimon/status.h" #include "paimon/testing/mock/mock_file_system.h" #include "paimon/testing/utils/testharness.h" namespace paimon::test { -TEST(ScanContextTest, TestSimple) { +TEST(ScanContextTest, TestDefaultValue) { ScanContextBuilder builder("table_root_path"); ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish()); ASSERT_EQ(ctx->GetPath(), "table_root_path"); @@ -37,12 +39,16 @@ TEST(ScanContextTest, TestSimple) { ASSERT_FALSE(ctx->GetScanFilters()->GetBucketFilter()); ASSERT_FALSE(ctx->GetScanFilters()->GetPredicate()); ASSERT_TRUE(ctx->GetScanFilters()->GetPartitionFilters().empty()); + ASSERT_TRUE(ctx->GetOptions().empty()); ASSERT_FALSE(ctx->GetGlobalIndexResult()); ASSERT_FALSE(ctx->GetSpecificFileSystem()); } -TEST(ScanContextTest, TestSetFilter) { +TEST(ScanContextTest, TestSetContent) { ScanContextBuilder builder("table_root_path"); + std::shared_ptr memory_pool = GetDefaultPool(); + std::shared_ptr executor = CreateDefaultExecutor(); + builder.SetBucketFilter(10); std::vector> partition_filters = {{{"f1", "20"}}}; builder.SetPartitionFilter(partition_filters); @@ -55,6 +61,8 @@ TEST(ScanContextTest, TestSetFilter) { builder.SetLimit(1000); builder.AddOption("key", "value"); builder.WithStreamingMode(true); + builder.WithMemoryPool(memory_pool); + builder.WithExecutor(executor); auto fs = std::make_shared(); builder.WithFileSystem(fs); ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish()); @@ -66,9 +74,22 @@ TEST(ScanContextTest, TestSetFilter) { ASSERT_EQ(*predicate, *(ctx->GetScanFilters()->GetPredicate())); ASSERT_EQ(partition_filters, ctx->GetScanFilters()->GetPartitionFilters()); ASSERT_EQ("{1,2,4,5}", ctx->GetGlobalIndexResult()->ToString()); + ASSERT_EQ(memory_pool, ctx->GetMemoryPool()); + ASSERT_EQ(executor, ctx->GetExecutor()); std::map expected_options = {{"key", "value"}}; ASSERT_EQ(expected_options, ctx->GetOptions()); ASSERT_EQ(fs, ctx->GetSpecificFileSystem()); } +TEST(ScanContextTest, TestSetOptionsOverridesAddedOptions) { + ScanContextBuilder builder("table_root_path"); + builder.AddOption("old", "value"); + builder.SetOptions({{"key1", "value1"}, {"key2", "value2"}}); + + ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish()); + + std::map expected_options = {{"key1", "value1"}, {"key2", "value2"}}; + ASSERT_EQ(expected_options, ctx->GetOptions()); +} + } // namespace paimon::test diff --git a/src/paimon/core/operation/write_context.cpp b/src/paimon/core/operation/write_context.cpp index ce9e52174..d2ad9efea 100644 --- a/src/paimon/core/operation/write_context.cpp +++ b/src/paimon/core/operation/write_context.cpp @@ -18,6 +18,8 @@ #include +#include "arrow/util/thread_pool.h" +#include "paimon/common/utils/arrow/status_utils.h" #include "paimon/common/utils/path_util.h" #include "paimon/core/utils/branch_manager.h" #include "paimon/executor.h" @@ -29,8 +31,9 @@ namespace paimon { WriteContext::WriteContext(const std::string& root_path, const std::string& commit_user, bool is_streaming_mode, bool ignore_num_bucket_check, - bool ignore_previous_files, const std::optional& write_id, - const std::string& branch, const std::vector& write_schema, + bool ignore_previous_files, bool enable_multi_thread_spill, + const std::optional& write_id, const std::string& branch, + const std::vector& write_schema, const std::shared_ptr& memory_pool, const std::shared_ptr& executor, const std::string& temp_directory, @@ -43,6 +46,7 @@ WriteContext::WriteContext(const std::string& root_path, const std::string& comm is_streaming_mode_(is_streaming_mode), ignore_num_bucket_check_(ignore_num_bucket_check), ignore_previous_files_(ignore_previous_files), + enable_multi_thread_spill_(enable_multi_thread_spill), write_id_(write_id), write_schema_(write_schema), memory_pool_(memory_pool), @@ -63,6 +67,7 @@ class WriteContextBuilder::Impl { is_streaming_mode_ = false; ignore_num_bucket_check_ = false; ignore_previous_files_ = false; + spill_thread_number_ = 0; memory_pool_ = GetDefaultPool(); executor_ = CreateDefaultExecutor(); temp_directory_.clear(); @@ -81,6 +86,7 @@ class WriteContextBuilder::Impl { bool is_streaming_mode_ = false; bool ignore_num_bucket_check_ = false; bool ignore_previous_files_ = false; + int32_t spill_thread_number_ = 0; std::vector write_schema_; std::shared_ptr memory_pool_ = GetDefaultPool(); std::shared_ptr executor_ = CreateDefaultExecutor(); @@ -164,6 +170,11 @@ WriteContextBuilder& WriteContextBuilder::WithFileSystemSchemeToIdentifierMap( return *this; } +WriteContextBuilder& WriteContextBuilder::SetWriteBufferSpillThreadNumber(int32_t thread_number) { + impl_->spill_thread_number_ = thread_number; + return *this; +} + WriteContextBuilder& WriteContextBuilder::WithFileSystem( const std::shared_ptr& file_system) { impl_->specific_file_system_ = file_system; @@ -175,12 +186,17 @@ Result> WriteContextBuilder::Finish() { if (impl_->root_path_.empty()) { return Status::Invalid("root path is empty"); } + bool enable_multi_thread_spill = impl_->spill_thread_number_ > 0; + if (enable_multi_thread_spill) { + PAIMON_RETURN_NOT_OK_FROM_ARROW( + arrow::SetCpuThreadPoolCapacity(impl_->spill_thread_number_)); + } auto ctx = std::make_unique( impl_->root_path_, impl_->commit_user_, impl_->is_streaming_mode_, - impl_->ignore_num_bucket_check_, impl_->ignore_previous_files_, impl_->write_id_, - impl_->branch_, impl_->write_schema_, impl_->memory_pool_, impl_->executor_, - impl_->temp_directory_, impl_->specific_file_system_, impl_->fs_scheme_to_identifier_map_, - impl_->options_); + impl_->ignore_num_bucket_check_, impl_->ignore_previous_files_, enable_multi_thread_spill, + impl_->write_id_, impl_->branch_, impl_->write_schema_, impl_->memory_pool_, + impl_->executor_, impl_->temp_directory_, impl_->specific_file_system_, + impl_->fs_scheme_to_identifier_map_, impl_->options_); impl_->Reset(); return ctx; } diff --git a/src/paimon/core/operation/write_context_test.cpp b/src/paimon/core/operation/write_context_test.cpp index 210aa4ec1..dbe66f020 100644 --- a/src/paimon/core/operation/write_context_test.cpp +++ b/src/paimon/core/operation/write_context_test.cpp @@ -26,7 +26,7 @@ namespace paimon::test { -TEST(WriteContextTest, TestSimple) { +TEST(WriteContextTest, TestDefaultValue) { WriteContextBuilder builder("table_root_path", "commit_user_1"); ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish()); ASSERT_EQ(ctx->GetRootPath(), "table_root_path"); @@ -34,6 +34,7 @@ TEST(WriteContextTest, TestSimple) { ASSERT_FALSE(ctx->IsStreamingMode()); ASSERT_FALSE(ctx->IgnoreNumBucketCheck()); ASSERT_FALSE(ctx->IgnorePreviousFiles()); + ASSERT_FALSE(ctx->EnableMultiThreadSpill()); ASSERT_EQ(ctx->GetWriteId(), std::nullopt); ASSERT_EQ(ctx->GetBranch(), "main"); ASSERT_TRUE(ctx->GetWriteSchema().empty()); @@ -42,9 +43,10 @@ TEST(WriteContextTest, TestSimple) { ASSERT_TRUE(ctx->GetTempDirectory().empty()); ASSERT_TRUE(ctx->GetOptions().empty()); ASSERT_TRUE(ctx->GetFileSystemSchemeToIdentifierMap().empty()); + ASSERT_FALSE(ctx->GetSpecificFileSystem()); } -TEST(WriteContextTest, TestAllWithMethods) { +TEST(WriteContextTest, TestSetContent) { WriteContextBuilder builder("table_root_path", "commit_user_1"); auto memory_pool = GetDefaultPool(); @@ -66,11 +68,13 @@ TEST(WriteContextTest, TestAllWithMethods) { .WithWriteSchema(write_schema) .WithFileSystemSchemeToIdentifierMap(fs_scheme_to_identifier_map) .WithFileSystem(file_system) + .AddOption("key", "value") .Finish()); ASSERT_TRUE(ctx->IsStreamingMode()); ASSERT_TRUE(ctx->IgnoreNumBucketCheck()); ASSERT_TRUE(ctx->IgnorePreviousFiles()); + ASSERT_FALSE(ctx->EnableMultiThreadSpill()); ASSERT_EQ(ctx->GetMemoryPool(), memory_pool); ASSERT_EQ(ctx->GetExecutor(), executor); ASSERT_EQ(ctx->GetTempDirectory(), "/tmp/with-all"); @@ -79,6 +83,28 @@ TEST(WriteContextTest, TestAllWithMethods) { ASSERT_EQ(ctx->GetWriteSchema(), write_schema); ASSERT_EQ(ctx->GetFileSystemSchemeToIdentifierMap(), fs_scheme_to_identifier_map); ASSERT_EQ(ctx->GetSpecificFileSystem(), file_system); + std::map expected_options = {{"key", "value"}}; + ASSERT_EQ(expected_options, ctx->GetOptions()); +} + +TEST(WriteContextTest, TestSetOptionsOverridesAddedOptions) { + WriteContextBuilder builder("table_root_path", "commit_user_1"); + builder.AddOption("old", "value"); + builder.SetOptions({{"key1", "value1"}, {"key2", "value2"}}); + + ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish()); + + std::map expected_options = {{"key1", "value1"}, {"key2", "value2"}}; + ASSERT_EQ(expected_options, ctx->GetOptions()); +} + +TEST(WriteContextTest, TestSetWriteBufferSpillThreadNumber) { + WriteContextBuilder builder("table_root_path", "commit_user_1"); + builder.SetWriteBufferSpillThreadNumber(2); + + ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish()); + + ASSERT_TRUE(ctx->EnableMultiThreadSpill()); } } // namespace paimon::test diff --git a/test/inte/write_inte_test.cpp b/test/inte/write_inte_test.cpp index a5d1ea456..8679d5962 100644 --- a/test/inte/write_inte_test.cpp +++ b/test/inte/write_inte_test.cpp @@ -4244,8 +4244,8 @@ TEST_P(WriteInteTest, TestPkSpillableIntermediateMergeWithTempFileTracking) { auto batch3 = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([["Alice", 10, 3]])").ValueOrDie(); - // Each write triggers spill. With LOCAL_SORT_MAX_NUM_FILE_HANDLES=2, intermediate - // merge keeps the number of temp files bounded. + // Each write triggers spill. With LOCAL_SORT_MAX_NUM_FILE_HANDLES=2, leveled + // merge triggers when a level reaches max_fan_in files. ASSERT_OK(write_array_fn(file_store_write.get(), {{"pt", "10"}}, /*bucket=*/0, batch1)); ASSERT_EQ(1, TestHelper::CountChannelFiles(file_system_, tmp_dir)); @@ -4253,7 +4253,8 @@ TEST_P(WriteInteTest, TestPkSpillableIntermediateMergeWithTempFileTracking) { ASSERT_EQ(1, TestHelper::CountChannelFiles(file_system_, tmp_dir)); ASSERT_OK(write_array_fn(file_store_write.get(), {{"pt", "10"}}, /*bucket=*/0, batch3)); - ASSERT_EQ(1, TestHelper::CountChannelFiles(file_system_, tmp_dir)); + // Level 0: 1 file (batch3), Level 1: 1 file (merged batch1+batch2) = 2 files total. + ASSERT_EQ(2, TestHelper::CountChannelFiles(file_system_, tmp_dir)); // PrepareCommit should consume all spill files ASSERT_OK_AND_ASSIGN(auto commit_messages, @@ -4315,7 +4316,9 @@ TEST_P(WriteInteTest, TestPkSpillableMultiBucketMultiRoundDataCorrectness) { }; WriteContextBuilder write_builder(table_path, "commit_user_1"); - write_builder.WithStreamingMode(true).WithTempDirectory(tmp_dir); + write_builder.WithStreamingMode(true) + .WithTempDirectory(tmp_dir) + .SetWriteBufferSpillThreadNumber(3); ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, write_builder.Finish()); ASSERT_OK_AND_ASSIGN(auto file_store_write, FileStoreWrite::Create(std::move(write_context))); @@ -4329,12 +4332,12 @@ TEST_P(WriteInteTest, TestPkSpillableMultiBucketMultiRoundDataCorrectness) { ASSERT_OK(write_array_fn(file_store_write.get(), {{"pt", "10"}}, /*bucket=*/0, r1_b0_batch1)); ASSERT_EQ(1, TestHelper::CountChannelFiles(file_system_, tmp_dir)); - // Trigger spill merge + // Trigger leveled merge (level 0 reaches max_fan_in=2) ASSERT_OK(write_array_fn(file_store_write.get(), {{"pt", "10"}}, /*bucket=*/0, r1_b0_batch2)); ASSERT_EQ(1, TestHelper::CountChannelFiles(file_system_, tmp_dir)); - // Trigger spill merge + // Third write: level 0 has 1 file, level 1 has 1 file = 2 total ASSERT_OK(write_array_fn(file_store_write.get(), {{"pt", "10"}}, /*bucket=*/0, r1_b0_batch3)); - ASSERT_EQ(1, TestHelper::CountChannelFiles(file_system_, tmp_dir)); + ASSERT_EQ(2, TestHelper::CountChannelFiles(file_system_, tmp_dir)); auto r1_b1_batch1 = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([["Dave", 10, 10]])").ValueOrDie(); @@ -4343,14 +4346,15 @@ TEST_P(WriteInteTest, TestPkSpillableMultiBucketMultiRoundDataCorrectness) { auto r1_b1_batch3 = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([["Dave", 10, 30]])").ValueOrDie(); + int32_t bucket0_files = TestHelper::CountChannelFiles(file_system_, tmp_dir); ASSERT_OK(write_array_fn(file_store_write.get(), {{"pt", "10"}}, /*bucket=*/1, r1_b1_batch1)); - ASSERT_EQ(2, TestHelper::CountChannelFiles(file_system_, tmp_dir)); - // Trigger spill merge + ASSERT_EQ(bucket0_files + 1, TestHelper::CountChannelFiles(file_system_, tmp_dir)); + // Trigger leveled merge for bucket 1 ASSERT_OK(write_array_fn(file_store_write.get(), {{"pt", "10"}}, /*bucket=*/1, r1_b1_batch2)); - ASSERT_EQ(2, TestHelper::CountChannelFiles(file_system_, tmp_dir)); - // Trigger spill merge + ASSERT_EQ(bucket0_files + 1, TestHelper::CountChannelFiles(file_system_, tmp_dir)); + // Third write for bucket 1: same pattern ASSERT_OK(write_array_fn(file_store_write.get(), {{"pt", "10"}}, /*bucket=*/1, r1_b1_batch3)); - ASSERT_EQ(2, TestHelper::CountChannelFiles(file_system_, tmp_dir)); + ASSERT_EQ(bucket0_files + 2, TestHelper::CountChannelFiles(file_system_, tmp_dir)); ASSERT_OK_AND_ASSIGN(auto commit_messages_1, file_store_write->PrepareCommit(/*wait_compaction=*/false,