Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
ac25c24
feat(spill): optimize external sort with leveled merger to reduce wri…
zjw1111 May 18, 2026
cae5efe
style: fix code style issues in leveled merger
zjw1111 May 18, 2026
ab3d4df
fix(test): adapt tests to leveled merger behavior
zjw1111 May 18, 2026
5bd1e41
fix: reset merge_function_wrapper on flush error to prevent use-after…
zjw1111 May 18, 2026
9f16343
Merge branch 'main' into optimize-spill-performance
lxy-9602 May 18, 2026
18f3fe8
fix: reset merge_function_wrapper in SortMergeReader::Close to preven…
zjw1111 May 18, 2026
e85d7d5
refactor: rename LeveledMerger to SpillFileMerger and improve code cl…
zjw1111 May 19, 2026
b3cecf0
test: add SpillWithIOException and SetMaxFanIn suppression tests
zjw1111 May 19, 2026
08914ef
test: improve spill test clarity and rename compaction to merge in co…
zjw1111 May 19, 2026
6a18bf9
Merge branch 'main' into optimize-spill-performance
zjw1111 May 19, 2026
5ef21f3
fix
zjw1111 May 19, 2026
1272f79
Merge branch 'main' into optimize-spill-performance
zjw1111 May 20, 2026
1dcb828
Merge branch 'main' into optimize-spill-performance
zjw1111 May 20, 2026
c6bd23f
Merge branch 'main' into optimize-spill-performance
lxy-9602 May 20, 2026
244096f
fix(options): align core option defaults with Java
zjw1111 May 20, 2026
f5841d1
feat: add SetWriteBufferSpillThreadNumber to control spill thread usage
zjw1111 May 20, 2026
22e0b66
test: enhance ReadContext, ScanContext, and WriteContext tests with d…
zjw1111 May 20, 2026
b99bacd
Merge branch 'main' into optimize-spill-performance
lucasfang May 20, 2026
170607a
fix
zjw1111 May 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/code-style.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ This document defines the coding conventions for the paimon-cpp project. All pul

---

## Integer Types

Use **fixed-width integer types** from `<cstdint>`. Do **not** use plain `int`, `long`, `short`, or `unsigned`.

| Use | Instead of |
|-----|-----------|
| `int8_t` / `uint8_t` | `char` (for numeric data) |
| `int16_t` / `uint16_t` | `short` |
| `int32_t` / `uint32_t` | `int` / `unsigned int` |
| `int64_t` / `uint64_t` | `long` / `long long` |

**Exception**: Loop variables iterating over small, bounded ranges (e.g. `for (int32_t i = 0; i < n; ++i)`) must still use `int32_t`, not `int`.

---

## Formatting

Formatting is based on **Google C++ Style** with the following overrides (defined in `.clang-format`):
Expand Down
4 changes: 2 additions & 2 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ struct PAIMON_EXPORT Options {
static const char SNAPSHOT_TIME_RETAINED[];

/// "snapshot.expire.limit" - The maximum number of snapshots allowed to expire at a time.
/// Default value is 10.
/// Default value is 50.
static const char SNAPSHOT_EXPIRE_LIMIT[];

/// "snapshot.clean-empty-directories" - Whether to try to clean empty directories when expiring
Expand Down Expand Up @@ -391,7 +391,7 @@ struct PAIMON_EXPORT Options {
/// reading the audit_log or binlog system tables. This is only valid for primary key tables.
/// Default value is "false".
static const char TABLE_READ_SEQUENCE_NUMBER_ENABLED[];
/// "key-value.sequence-number.enabled" - Whether to include the _SEQUENCE_NUMBER field when
/// "key-value.sequence_number.enabled" - Whether to include the _SEQUENCE_NUMBER field when
/// reading key-value data. This is an internal option used by AuditLogTable and BinlogTable
/// when table-read.sequence-number.enabled is set to true. Default value is "false".
static const char KEY_VALUE_SEQUENCE_NUMBER_ENABLED[];
Expand Down
16 changes: 14 additions & 2 deletions include/paimon/write_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ class PAIMON_EXPORT WriteContext {
public:
WriteContext(const std::string& root_path, const std::string& commit_user,
bool is_streaming_mode, bool ignore_num_bucket_check, bool ignore_previous_files,
const std::optional<int32_t>& write_id, const std::string& branch,
const std::vector<std::string>& write_schema,
bool enable_multi_thread_spill, const std::optional<int32_t>& write_id,
const std::string& branch, const std::vector<std::string>& write_schema,
const std::shared_ptr<MemoryPool>& memory_pool,
const std::shared_ptr<Executor>& executor, const std::string& temp_directory,
const std::shared_ptr<FileSystem>& specific_file_system,
Expand Down Expand Up @@ -106,13 +106,18 @@ class PAIMON_EXPORT WriteContext {
return specific_file_system_;
}

bool EnableMultiThreadSpill() const {
return enable_multi_thread_spill_;
}

private:
std::string root_path_;
std::string commit_user_;
std::string branch_;
bool is_streaming_mode_;
bool ignore_num_bucket_check_;
bool ignore_previous_files_;
bool enable_multi_thread_spill_;
std::optional<int32_t> write_id_;
std::vector<std::string> write_schema_;
std::shared_ptr<MemoryPool> memory_pool_;
Expand Down Expand Up @@ -222,6 +227,13 @@ class PAIMON_EXPORT WriteContextBuilder {
WriteContextBuilder& WithFileSystemSchemeToIdentifierMap(
const std::map<std::string, std::string>& fs_scheme_to_identifier_map);

/// Set the thread number for write buffer spill operations. (default is 0)
/// If <= 0, threading is disabled for spill IPC read/write.
/// If > 0, sets arrow CPU thread pool capacity for spill operations.
/// @param thread_number The thread number to use for spill operations.
/// @return Reference to this builder for method chaining.
WriteContextBuilder& SetWriteBufferSpillThreadNumber(int32_t thread_number);

/// Build and return a `WriteContext` instance with input validation.
/// @return Result containing the constructed `WriteContext` or an error status.
Result<std::unique_ptr<WriteContext>> Finish();
Expand Down
2 changes: 2 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ set(PAIMON_CORE_SRCS
core/mergetree/merge_tree_writer.cpp
core/mergetree/in_memory_sort_buffer.cpp
core/mergetree/external_sort_buffer.cpp
core/mergetree/spill_file_merger.cpp
core/mergetree/write_buffer.cpp
core/mergetree/levels.cpp
core/mergetree/lookup_file.cpp
Expand Down Expand Up @@ -651,6 +652,7 @@ if(PAIMON_BUILD_TESTS)
core/mergetree/merge_tree_writer_test.cpp
core/mergetree/write_buffer_test.cpp
core/mergetree/sort_buffer_test.cpp
core/mergetree/spill_file_merger_test.cpp
core/mergetree/sorted_run_test.cpp
core/mergetree/spill_channel_manager_test.cpp
core/mergetree/spill_reader_writer_test.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ const char Options::GLOBAL_INDEX_THREAD_NUM[] = "global-index.thread-num";
const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path";
const char Options::AGGREGATION_REMOVE_RECORD_ON_DELETE[] = "aggregation.remove-record-on-delete";
const char Options::TABLE_READ_SEQUENCE_NUMBER_ENABLED[] = "table-read.sequence-number.enabled";
const char Options::KEY_VALUE_SEQUENCE_NUMBER_ENABLED[] = "key-value.sequence-number.enabled";
const char Options::KEY_VALUE_SEQUENCE_NUMBER_ENABLED[] = "key-value.sequence_number.enabled";
const char Options::SCAN_TIMESTAMP_MILLIS[] = "scan.timestamp-millis";
const char Options::SCAN_TIMESTAMP[] = "scan.timestamp";
const char Options::SCAN_TAG_NAME[] = "scan.tag-name";
Expand Down
4 changes: 2 additions & 2 deletions src/paimon/common/fs/file_system_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class FileSystemTest : public ::testing::Test, public ::testing::WithParamInterf
std::string test_root_;
};

TEST_P(FileSystemTest, TestNoneFileSystemFactory) {
TEST(FileSystemStaticTest, TestNoneFileSystemFactory) {
std::map<std::string, std::string> fs_options;
Result<std::unique_ptr<FileSystem>> fs =
FileSystemFactory::Get("not exist", "/tmp", fs_options);
Expand Down Expand Up @@ -516,7 +516,7 @@ TEST_P(FileSystemTest, TestReadAndWriteAndAtomicStoreFile) {
ASSERT_EQ(read_content, new_content);
}

TEST_P(FileSystemTest, TestIsObjectStore) {
TEST(FileSystemStaticTest, TestIsObjectStore) {
ASSERT_OK_AND_ASSIGN(bool is_object_store, FileSystem::IsObjectStore("file:///tmp/test.txt"));
ASSERT_FALSE(is_object_store);
ASSERT_OK_AND_ASSIGN(is_object_store, FileSystem::IsObjectStore("/tmp/test.txt"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1491,7 +1491,7 @@ TEST_P(BTreeGlobalIndexIntegrationTest, WriteAndReadAllNull) {
}
}

TEST_P(BTreeGlobalIndexIntegrationTest, WriteAndReadLargeDataWithSmallBlocks) {
TEST_F(BTreeGlobalIndexIntegrationTest, WriteAndReadLargeDataWithSmallBlocks) {
// Use very small block size and cache size to force multiple block evictions
auto file_writer = std::make_shared<FakeGlobalIndexFileWriter>(fs_, base_path_);
auto field = arrow::field("int_field", arrow::int32());
Expand Down Expand Up @@ -1710,7 +1710,7 @@ TEST_P(BTreeGlobalIndexIntegrationTest, CreateReaderWithMultiFieldSchema) {
"supposed to have single field");
}

TEST_P(BTreeGlobalIndexIntegrationTest, CreateWriterWithMissingField) {
TEST_F(BTreeGlobalIndexIntegrationTest, CreateWriterWithMissingField) {
auto file_writer = std::make_shared<FakeGlobalIndexFileWriter>(fs_, base_path_);
auto type = arrow::struct_({arrow::field("existing_field", arrow::int32())});
auto struct_type = std::dynamic_pointer_cast<arrow::StructType>(type);
Expand Down
6 changes: 3 additions & 3 deletions src/paimon/core/core_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -583,8 +583,8 @@ struct CoreOptions::Impl {
int32_t snapshot_num_retain_min = 10;
// Parse snapshot.num-retained.max - maximum completed snapshots to retain
int32_t snapshot_num_retain_max = std::numeric_limits<int32_t>::max();
// Parse snapshot.expire.limit - maximum snapshots allowed to expire at a time, default 10
int32_t snapshot_expire_limit = 10;
// Parse snapshot.expire.limit - maximum snapshots allowed to expire at a time, default 50
int32_t snapshot_expire_limit = 50;
// Parse snapshot.time-retained - maximum time of completed snapshots to retain
int64_t snapshot_time_retained = 1 * 3600 * 1000; // 1 hour
PAIMON_RETURN_NOT_OK(
Expand Down Expand Up @@ -642,7 +642,7 @@ struct CoreOptions::Impl {
// Parse table-read.sequence-number.enabled - expose sequence number in system tables
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::TABLE_READ_SEQUENCE_NUMBER_ENABLED,
&table_read_sequence_number_enabled));
// Parse key-value.sequence-number.enabled - internal sequence number read switch
// Parse key-value.sequence_number.enabled - internal sequence number read switch
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::KEY_VALUE_SEQUENCE_NUMBER_ENABLED,
&key_value_sequence_number_enabled));
// Parse partial-update.remove-record-on-sequence-group
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/core/core_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ TEST(CoreOptionsTest, TestDefaultValue) {
ExpireConfig expire_config = core_options.GetExpireConfig();
ASSERT_EQ(10, expire_config.GetSnapshotRetainMin());
ASSERT_EQ(std::numeric_limits<int32_t>::max(), expire_config.GetSnapshotRetainMax());
ASSERT_EQ(10, expire_config.GetSnapshotMaxDeletes());
ASSERT_EQ(50, expire_config.GetSnapshotMaxDeletes());
ASSERT_FALSE(expire_config.CleanEmptyDirectories());
ASSERT_EQ(1 * 3600 * 1000L, expire_config.GetSnapshotTimeRetainMs());
ASSERT_EQ(std::vector<std::string>(), core_options.GetSequenceField());
Expand Down
6 changes: 5 additions & 1 deletion src/paimon/core/disk/file_io_channel.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#pragma once

#include <cstdint>
#include <memory>
#include <random>
#include <string>

Expand Down Expand Up @@ -68,4 +67,9 @@ class PAIMON_EXPORT FileIOChannel {
};
};

struct FileChannelInfo {
FileIOChannel::ID channel_id;
int64_t file_size;
};

} // namespace paimon
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ class LookupMergeTreeCompactRewriterTest : public ::testing::TestWithParam<std::
/*last_sequence_number=*/last_sequence_number, std::vector<std::string>({"key"}),
data_path_factory, key_comparator, /*user_defined_seq_comparator=*/nullptr,
merge_function_wrapper, /*schema_id=*/latest_schema.value()->Id(), arrow_schema_,
options, std::make_shared<NoopCompactManager>(), /*io_manager=*/nullptr, pool_));
options, std::make_shared<NoopCompactManager>(), /*io_manager=*/nullptr,
/*enable_multi_thread_spill=*/false, pool_));

// write data
ArrowArray c_src_array;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,10 @@ Result<CompactResult> MergeTreeCompactRewriter::RewriteCompaction(

ScopeGuard write_guard([&]() -> void {
rolling_writer->Abort();
merge_file_split_read_.reset();
for (const auto& reader : reader_holders) {
reader->Close();
}
merge_file_split_read_.reset();
});

for (const auto& section : sections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ class SortMergeReaderWithLoserTree : public SortMergeReader {

void Close() override {
loser_tree_->Close();
if (merge_function_wrapper_) {
merge_function_wrapper_->Reset();
}
}

class Iterator : public SortMergeReader::Iterator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ class SortMergeReaderWithMinHeap : public SortMergeReader {
for (const auto& reader : readers_holder_) {
reader->Close();
}
if (merge_function_wrapper_) {
merge_function_wrapper_->Reset();
}
}

std::shared_ptr<Metrics> GetReaderMetrics() const override {
Expand Down
Loading
Loading