Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/paimon/core/catalog/commit_table_request_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ TEST(CommitTableRequestTest, TestSimple) {
/*statistics=*/std::nullopt, /*properties=*/std::nullopt, /*next_row_id=*/0);
std::vector<PartitionStatistics> partition_statistics = {
PartitionStatistics(/*spec=*/{{"f1", "20"}}, /*record_count=*/1, /*file_size_in_bytes=*/541,
/*file_count=*/1, /*last_file_creation_time=*/1724090888743),
/*file_count=*/1, /*last_file_creation_time=*/1724090888743,
/*total_buckets=*/-1),
PartitionStatistics(/*spec=*/{{"f1", "10"}}, /*record_count=*/4,
/*file_size_in_bytes=*/1118, /*file_count=*/2,
/*last_file_creation_time=*/1724090888727)};
/*last_file_creation_time=*/1724090888727, /*total_buckets=*/-1)};
std::string expected_request_str = R"({
"snapshot": {
"version": 3,
Expand Down Expand Up @@ -74,7 +75,8 @@ TEST(CommitTableRequestTest, TestSimple) {
"recordCount": 1,
"fileSizeInBytes": 541,
"fileCount": 1,
"lastFileCreationTime": 1724090888743
"lastFileCreationTime": 1724090888743,
"totalBuckets": -1
},
{
"spec": {
Expand All @@ -83,7 +85,8 @@ TEST(CommitTableRequestTest, TestSimple) {
"recordCount": 4,
"fileSizeInBytes": 1118,
"fileCount": 2,
"lastFileCreationTime": 1724090888727
"lastFileCreationTime": 1724090888727,
"totalBuckets": -1
}
]
})";
Expand Down
24 changes: 14 additions & 10 deletions src/paimon/core/manifest/partition_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,31 @@
namespace paimon {
PartitionEntry::PartitionEntry(const BinaryRow& partition, int64_t record_count,
int64_t file_size_in_bytes, int64_t file_count,
int64_t last_file_creation_time)
int64_t last_file_creation_time, int32_t total_buckets)
: partition_(partition),
record_count_(record_count),
file_size_in_bytes_(file_size_in_bytes),
file_count_(file_count),
last_file_creation_time_(last_file_creation_time) {}
last_file_creation_time_(last_file_creation_time),
total_buckets_(total_buckets) {}

PartitionEntry PartitionEntry::Merge(const PartitionEntry& entry) const {
return PartitionEntry(partition_, record_count_ + entry.record_count_,
file_size_in_bytes_ + entry.file_size_in_bytes_,
file_count_ + entry.file_count_,
std::max(last_file_creation_time_, entry.last_file_creation_time_));
return PartitionEntry(
partition_, record_count_ + entry.record_count_,
file_size_in_bytes_ + entry.file_size_in_bytes_, file_count_ + entry.file_count_,
std::max(last_file_creation_time_, entry.last_file_creation_time_), entry.total_buckets_);
}

Result<PartitionEntry> PartitionEntry::FromDataFile(const BinaryRow& partition,
const FileKind& kind,
const std::shared_ptr<DataFileMeta>& file) {
const std::shared_ptr<DataFileMeta>& file,
const int32_t total_buckets) {
int64_t record_count = kind == FileKind::Delete() ? -file->row_count : file->row_count;
int64_t file_size_in_bytes = kind == FileKind::Delete() ? -file->file_size : file->file_size;
int64_t file_count = kind == FileKind::Delete() ? -1 : 1;
PAIMON_ASSIGN_OR_RAISE(int64_t utc_millis, file->CreationTimeEpochMillis());
return PartitionEntry(partition, record_count, file_size_in_bytes, file_count, utc_millis);
return PartitionEntry(partition, record_count, file_size_in_bytes, file_count, utc_millis,
total_buckets);
}

Status PartitionEntry::Merge(const std::vector<ManifestEntry>& from,
Expand All @@ -75,7 +78,7 @@ Result<PartitionStatistics> PartitionEntry::ToPartitionStatistics(
PAIMON_ASSIGN_OR_RAISE(part_values, partition_computer->GeneratePartitionVector(partition_));
std::map<std::string, std::string> part_values_map(part_values.begin(), part_values.end());
return PartitionStatistics(part_values_map, record_count_, file_size_in_bytes_, file_count_,
last_file_creation_time_);
last_file_creation_time_, total_buckets_);
}

bool PartitionEntry::operator==(const PartitionEntry& other) const {
Expand All @@ -84,7 +87,8 @@ bool PartitionEntry::operator==(const PartitionEntry& other) const {
}
return partition_ == other.partition_ && record_count_ == other.record_count_ &&
file_size_in_bytes_ == other.file_size_in_bytes_ && file_count_ == other.file_count_ &&
last_file_creation_time_ == other.last_file_creation_time_;
last_file_creation_time_ == other.last_file_creation_time_ &&
total_buckets_ == other.total_buckets_;
}

} // namespace paimon
11 changes: 8 additions & 3 deletions src/paimon/core/manifest/partition_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class BinaryRowPartitionComputer;
class PartitionEntry {
public:
PartitionEntry(const BinaryRow& partition, int64_t record_count, int64_t file_size_in_bytes,
int64_t file_count, int64_t last_file_creation_time);
int64_t file_count, int64_t last_file_creation_time, int32_t total_buckets);

const BinaryRow& Partition() const {
return partition_;
Expand All @@ -58,14 +58,18 @@ class PartitionEntry {
int64_t LastFileCreationTime() const {
return last_file_creation_time_;
}
int32_t TotalBuckets() const {
return total_buckets_;
}
PartitionEntry Merge(const PartitionEntry& entry) const;

static Result<PartitionEntry> FromManifestEntry(const ManifestEntry& entry) {
return FromDataFile(entry.Partition(), entry.Kind(), entry.File());
return FromDataFile(entry.Partition(), entry.Kind(), entry.File(), entry.TotalBuckets());
}

static Result<PartitionEntry> FromDataFile(const BinaryRow& partition, const FileKind& kind,
const std::shared_ptr<DataFileMeta>& file);
const std::shared_ptr<DataFileMeta>& file,
int32_t total_buckets);

Result<PartitionStatistics> ToPartitionStatistics(
const BinaryRowPartitionComputer* partition_computer) const;
Expand All @@ -81,5 +85,6 @@ class PartitionEntry {
int64_t file_size_in_bytes_;
int64_t file_count_;
int64_t last_file_creation_time_;
int32_t total_buckets_;
};
} // namespace paimon
16 changes: 10 additions & 6 deletions src/paimon/core/manifest/partition_entry_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,10 @@ TEST_F(PartitionEntryTest, TestSimple) {
/*total_buckets=*/2, GetDataFileMeta(Timestamp(1500, 123)))));

auto res = partition_entry1.Merge(partition_entry2);
auto expected_partition_entry = PartitionEntry(BinaryRow::EmptyRow(), /*record_count=*/10,
/*file_size_in_bytes=*/20, 2, -28798500);
auto expected_partition_entry =
PartitionEntry(BinaryRow::EmptyRow(), /*record_count=*/10,
/*file_size_in_bytes=*/20, /*file_count=*/2,
/*last_file_creation_time=*/-28798500, /*total_buckets=*/2);
ASSERT_EQ(res, expected_partition_entry);
}
{
Expand All @@ -82,8 +84,10 @@ TEST_F(PartitionEntryTest, TestSimple) {
/*total_buckets=*/2, GetDataFileMeta(Timestamp(1500, 123)))));

auto res = partition_entry1.Merge(partition_entry2);
auto expected_partition_entry = PartitionEntry(BinaryRow::EmptyRow(), /*record_count=*/0,
/*file_size_in_bytes=*/0, 0, -28798500);
auto expected_partition_entry =
PartitionEntry(BinaryRow::EmptyRow(), /*record_count=*/0,
/*file_size_in_bytes=*/0, /*file_count=*/0,
/*last_file_creation_time=*/-28798500, /*total_buckets=*/2);
ASSERT_EQ(res, expected_partition_entry);
}
}
Expand All @@ -108,10 +112,10 @@ TEST_F(PartitionEntryTest, TestMerge) {
std::unordered_map<BinaryRow, PartitionEntry> expected_partition_entry;
expected_partition_entry.emplace(
std::piecewise_construct, std::forward_as_tuple(partition0),
std::forward_as_tuple(PartitionEntry(partition0, 10, 20, 2, -28798500)));
std::forward_as_tuple(PartitionEntry(partition0, 10, 20, 2, -28798500, 2)));
expected_partition_entry.emplace(
std::piecewise_construct, std::forward_as_tuple(partition1),
std::forward_as_tuple(PartitionEntry(partition1, 5, 10, 1, -28800000)));
std::forward_as_tuple(PartitionEntry(partition1, 5, 10, 1, -28800000, 2)));
ASSERT_EQ(to, expected_partition_entry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,11 @@ TEST(AppendOnlyFileStoreScanTest, TestReadPartitionEntries) {

std::vector<PartitionEntry> expected_partition_entries = {
PartitionEntry(GenerateRow(10), /*record_count=*/9, /*file_size_in_bytes=*/1183,
/*file_count=*/2, /*last_file_creation_time=*/1721643834472l - 28800000l),
/*file_count=*/2, /*last_file_creation_time=*/1721643834472l - 28800000l,
/*total_buckets=*/2),
PartitionEntry(GenerateRow(20), /*record_count=*/2, /*file_size_in_bytes=*/1047,
/*file_count=*/2, /*last_file_creation_time=*/1721643267404l - 28800000l)};
/*file_count=*/2, /*last_file_creation_time=*/1721643267404l - 28800000l,
/*total_buckets=*/2)};
auto ComparePartitionEntryByPartition = [](const PartitionEntry& lhs,
const PartitionEntry& rhs) -> bool {
return lhs.Partition().GetInt(0) < rhs.Partition().GetInt(0);
Expand Down
6 changes: 4 additions & 2 deletions src/paimon/core/operation/file_store_commit_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,12 @@ TEST_F(FileStoreCommitImplTest, TestRESTCatalogCommit) {
std::vector<PartitionStatistics> expected_partition_statistics = {
PartitionStatistics(/*spec=*/{{"f1", "20"}}, /*record_count=*/1, /*file_size_in_bytes=*/541,
/*file_count=*/1,
/*last_file_creation_time=*/1724090888743l - 28800000l),
/*last_file_creation_time=*/1724090888743l - 28800000l,
/*total_buckets=*/-1),
PartitionStatistics(/*spec=*/{{"f1", "10"}}, /*record_count=*/4,
/*file_size_in_bytes=*/1118, /*file_count=*/2,
/*last_file_creation_time=*/1724090888727l - 28800000l)};
/*last_file_creation_time=*/1724090888727l - 28800000l,
/*total_buckets=*/-1)};
CommitTableRequest expected_commit_table_request(expected_snapshot,
expected_partition_statistics);
ASSERT_TRUE(commit_table_request.TEST_Equal(expected_commit_table_request));
Expand Down
19 changes: 15 additions & 4 deletions src/paimon/core/partition/partition_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ class PartitionStatistics : public Jsonizable<PartitionStatistics> {
public:
using SpecType = std::map<std::string, std::string>;
PartitionStatistics(const SpecType& spec, int64_t record_count, int64_t file_size_in_bytes,
int64_t file_count, int64_t last_file_creation_time)
int64_t file_count, int64_t last_file_creation_time, int32_t total_buckets)
: spec_(spec),
record_count_(record_count),
file_size_in_bytes_(file_size_in_bytes),
file_count_(file_count),
last_file_creation_time_(last_file_creation_time) {}
last_file_creation_time_(last_file_creation_time),
total_buckets_(total_buckets) {}

const SpecType& Spec() const {
return spec_;
Expand All @@ -56,6 +57,9 @@ class PartitionStatistics : public Jsonizable<PartitionStatistics> {
int64_t LastFileCreationTime() const {
return last_file_creation_time_;
}
int32_t TotalBuckets() const {
return total_buckets_;
}

rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) const
noexcept(false) override {
Expand All @@ -72,6 +76,8 @@ class PartitionStatistics : public Jsonizable<PartitionStatistics> {
obj.AddMember(rapidjson::StringRef(FIELD_LAST_FILE_CREATION_TIME),
RapidJsonUtil::SerializeValue(last_file_creation_time_, allocator).Move(),
*allocator);
obj.AddMember(rapidjson::StringRef(FIELD_TOTAL_BUCKETS),
RapidJsonUtil::SerializeValue(total_buckets_, allocator).Move(), *allocator);
return obj;
}

Expand All @@ -83,12 +89,14 @@ class PartitionStatistics : public Jsonizable<PartitionStatistics> {
file_count_ = RapidJsonUtil::DeserializeKeyValue<int64_t>(obj, FIELD_FILE_COUNT);
last_file_creation_time_ =
RapidJsonUtil::DeserializeKeyValue<int64_t>(obj, FIELD_LAST_FILE_CREATION_TIME);
total_buckets_ = RapidJsonUtil::DeserializeKeyValue<int32_t>(obj, FIELD_TOTAL_BUCKETS);
}

bool operator==(const PartitionStatistics& rhs) const {
return record_count_ == rhs.record_count_ &&
file_size_in_bytes_ == rhs.file_size_in_bytes_ && file_count_ == rhs.file_count_ &&
last_file_creation_time_ == rhs.last_file_creation_time_ && spec_ == rhs.spec_;
last_file_creation_time_ == rhs.last_file_creation_time_ &&
total_buckets_ == rhs.total_buckets_ && spec_ == rhs.spec_;
}

std::string ToString() const {
Expand All @@ -102,7 +110,8 @@ class PartitionStatistics : public Jsonizable<PartitionStatistics> {
}
oss << "}, recordCount=" << record_count_ << ", fileSizeInBytes=" << file_size_in_bytes_
<< ", fileCount=" << file_count_
<< ", lastFileCreationTime=" << last_file_creation_time_ << "}";
<< ", lastFileCreationTime=" << last_file_creation_time_
<< ", totalBuckets=" << total_buckets_ << "}";
return oss.str();
}

Expand All @@ -115,12 +124,14 @@ class PartitionStatistics : public Jsonizable<PartitionStatistics> {
static constexpr const char* FIELD_FILE_SIZE_IN_BYTES = "fileSizeInBytes";
static constexpr const char* FIELD_FILE_COUNT = "fileCount";
static constexpr const char* FIELD_LAST_FILE_CREATION_TIME = "lastFileCreationTime";
static constexpr const char* FIELD_TOTAL_BUCKETS = "totalBuckets";

SpecType spec_;
int64_t record_count_ = 0;
int64_t file_size_in_bytes_ = 0;
int64_t file_count_ = 0;
int64_t last_file_creation_time_ = 0;
int32_t total_buckets_ = 0;
};

} // namespace paimon
5 changes: 3 additions & 2 deletions src/paimon/core/partition/partition_statistics_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,16 @@ TEST_F(PartitionStatisticsTest, TestJsonizable) {
"recordCount": 4,
"fileSizeInBytes": 1118,
"fileCount": 2,
"lastFileCreationTime": 1724090888727
"lastFileCreationTime": 1724090888727,
"totalBuckets": 1
})";

ASSERT_OK_AND_ASSIGN(PartitionStatistics partition_statistics,
PartitionStatistics::FromJsonString(json_str));

PartitionStatistics expected_partition_statistics(
/*spec=*/{{"f1", "10"}, {"f2", "20"}}, /*record_count=*/4, /*file_size_in_bytes=*/1118,
/*file_count=*/2, /*last_file_creation_time=*/1724090888727);
/*file_count=*/2, /*last_file_creation_time=*/1724090888727, /*total_buckets=*/1);
ASSERT_EQ(expected_partition_statistics, partition_statistics);

ASSERT_OK_AND_ASSIGN(std::string new_json_str, partition_statistics.ToJsonString());
Expand Down
Loading