diff --git a/include/paimon/reader/count_reader.h b/include/paimon/reader/count_reader.h new file mode 100644 index 000000000..46b8591d1 --- /dev/null +++ b/include/paimon/reader/count_reader.h @@ -0,0 +1,33 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "paimon/result.h" +#include "paimon/visibility.h" + +namespace paimon { + +/// Reader abstraction for count queries. +class PAIMON_EXPORT CountReader { + public: + virtual ~CountReader() = default; + + /// Count rows for splits bound by the corresponding CreateCountReader call. + virtual Result CountRows() = 0; +}; + +} // namespace paimon diff --git a/include/paimon/table/source/table_read.h b/include/paimon/table/source/table_read.h index 51c1b6efb..25175f025 100644 --- a/include/paimon/table/source/table_read.h +++ b/include/paimon/table/source/table_read.h @@ -23,6 +23,7 @@ #include "paimon/memory/memory_pool.h" #include "paimon/read_context.h" #include "paimon/reader/batch_reader.h" +#include "paimon/reader/count_reader.h" #include "paimon/result.h" #include "paimon/table/source/split.h" #include "paimon/visibility.h" @@ -63,6 +64,12 @@ class PAIMON_EXPORT TableRead { virtual Result> CreateReader( const std::shared_ptr& split) = 0; + /// Creates a `CountReader` for count queries on the specified splits. + /// + /// Implementations may override this to provide a more efficient count path. + virtual Result> CreateCountReader( + const std::vector>& splits); + protected: explicit TableRead(const std::shared_ptr& memory_pool); diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index 77ff23131..0d47aef95 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -264,6 +264,7 @@ set(PAIMON_CORE_SRCS core/mergetree/lookup_file.cpp core/mergetree/lookup_levels.cpp core/mergetree/lookup/remote_lookup_file_manager.cpp + core/mergetree/row_count_accumulator.cpp core/migrate/file_meta_utils.cpp core/operation/data_evolution_file_store_scan.cpp core/operation/data_evolution_split_read.cpp @@ -305,6 +306,7 @@ set(PAIMON_CORE_SRCS core/table/sink/commit_message.cpp core/table/sink/commit_message_impl.cpp core/table/sink/commit_message_serializer.cpp + core/table/source/append_count_reader.cpp core/table/source/append_only_table_read.cpp core/table/source/split.cpp core/table/source/data_split_impl.cpp @@ -312,6 +314,7 @@ set(PAIMON_CORE_SRCS core/table/source/data_table_stream_scan.cpp core/table/source/fallback_table_read.cpp core/table/source/key_value_table_read.cpp + core/table/source/pk_count_reader.cpp core/table/source/merge_tree_split_generator.cpp core/table/source/data_evolution_split_generator.cpp core/table/source/plan_impl.cpp @@ -703,6 +706,8 @@ if(PAIMON_BUILD_TESTS) core/table/sink/commit_message_impl_test.cpp core/table/source/fallback_data_split_test.cpp core/table/source/table_read_test.cpp + core/table/source/append_count_reader_test.cpp + core/table/source/pk_count_reader_test.cpp core/table/source/data_split_test.cpp core/table/source/deletion_file_test.cpp core/table/source/split_generator_test.cpp diff --git a/src/paimon/core/deletionvectors/deletion_vector.cpp b/src/paimon/core/deletionvectors/deletion_vector.cpp index d8217c03e..9ea909d39 100644 --- a/src/paimon/core/deletionvectors/deletion_vector.cpp +++ b/src/paimon/core/deletionvectors/deletion_vector.cpp @@ -16,6 +16,7 @@ #include "paimon/core/deletionvectors/deletion_vector.h" +#include #include #include @@ -59,6 +60,22 @@ DeletionVector::Factory DeletionVector::CreateFactory( }; } +std::unordered_map DeletionVector::CreateDeletionFileMap( + const std::vector>& data_files, + const std::vector>& deletion_files) { + std::unordered_map deletion_file_map; + if (deletion_files.empty()) { + return deletion_file_map; + } + assert(deletion_files.size() == data_files.size()); + for (size_t i = 0; i < deletion_files.size(); i++) { + if (deletion_files[i] != std::nullopt) { + deletion_file_map.emplace(data_files[i]->file_name, deletion_files[i].value()); + } + } + return deletion_file_map; +} + Result> DeletionVector::DeserializeFromBytes(const Bytes* bytes, MemoryPool* pool) { return BitmapDeletionVector::Deserialize(bytes->data(), bytes->size(), pool); diff --git a/src/paimon/core/deletionvectors/deletion_vector.h b/src/paimon/core/deletionvectors/deletion_vector.h index 6eff2fec1..161975caa 100644 --- a/src/paimon/core/deletionvectors/deletion_vector.h +++ b/src/paimon/core/deletionvectors/deletion_vector.h @@ -49,6 +49,14 @@ class DeletionVector { static Factory CreateFactory(const std::shared_ptr& dv_maintainer); + /// Builds a map from data file name to its deletion file. + /// + /// Entries whose deletion file is absent are skipped. Returns an empty map when + /// `deletion_files` is empty. + static std::unordered_map CreateDeletionFileMap( + const std::vector>& data_files, + const std::vector>& deletion_files); + virtual ~DeletionVector() = default; /// Marks the row at the specified position as deleted. diff --git a/src/paimon/core/deletionvectors/deletion_vector_test.cpp b/src/paimon/core/deletionvectors/deletion_vector_test.cpp index 435c75f97..d7c1bdbcc 100644 --- a/src/paimon/core/deletionvectors/deletion_vector_test.cpp +++ b/src/paimon/core/deletionvectors/deletion_vector_test.cpp @@ -19,12 +19,16 @@ #include #include #include +#include +#include #include #include #include "gtest/gtest.h" #include "paimon/core/deletionvectors/bitmap64_deletion_vector.h" #include "paimon/core/deletionvectors/bitmap_deletion_vector.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/table/source/deletion_file.h" #include "paimon/io/byte_array_input_stream.h" #include "paimon/io/byte_order.h" #include "paimon/io/data_input_stream.h" @@ -41,6 +45,16 @@ void AppendInt32BigEndian(std::vector* bytes, int32_t value) { bytes->push_back(static_cast(value & 0xFF)); } +std::shared_ptr CreateDataFileMeta(const std::string& file_name) { + return std::make_shared( + file_name, /*file_size=*/100, /*row_count=*/10, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + DataFileMeta::DUMMY_LEVEL, std::vector>{}, Timestamp(0, 0), + std::nullopt, nullptr, FileSource::Append(), std::nullopt, std::nullopt, std::nullopt, + std::nullopt); +} + } // namespace TEST(DeletionVectorTest, TestSimple) { @@ -161,4 +175,24 @@ TEST(DeletionVectorTest, ReadFromDataInputStreamInvalidMagicNumber) { "Invalid magic number"); } +TEST(DeletionVectorTest, CreateDeletionFileMap) { + std::vector> data_files = {CreateDataFileMeta("file-0.orc"), + CreateDataFileMeta("file-1.orc"), + CreateDataFileMeta("file-2.orc")}; + + auto empty_map = DeletionVector::CreateDeletionFileMap(data_files, {}); + ASSERT_TRUE(empty_map.empty()); + + DeletionFile deletion_file_0("dv-0", /*offset=*/10, /*length=*/20, /*cardinality=*/3); + DeletionFile deletion_file_2("dv-2", /*offset=*/30, /*length=*/40, std::nullopt); + std::vector> deletion_files = {deletion_file_0, std::nullopt, + deletion_file_2}; + + auto deletion_file_map = DeletionVector::CreateDeletionFileMap(data_files, deletion_files); + ASSERT_EQ(deletion_file_map.size(), 2); + ASSERT_EQ(deletion_file_map.at("file-0.orc"), deletion_file_0); + ASSERT_EQ(deletion_file_map.count("file-1.orc"), 0); + ASSERT_EQ(deletion_file_map.at("file-2.orc"), deletion_file_2); +} + } // namespace paimon::test diff --git a/src/paimon/core/mergetree/row_count_accumulator.cpp b/src/paimon/core/mergetree/row_count_accumulator.cpp new file mode 100644 index 000000000..a186ba5d9 --- /dev/null +++ b/src/paimon/core/mergetree/row_count_accumulator.cpp @@ -0,0 +1,66 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/mergetree/row_count_accumulator.h" + +#include + +#include "paimon/core/key_value.h" + +namespace paimon { + +RowCountAccumulator::RowCountAccumulator(std::unique_ptr&& merged_reader) + : merged_reader_(std::move(merged_reader)) {} + +Result RowCountAccumulator::CountAll() { + int64_t count = 0; + + while (true) { + // Get next batch of merged KV iterators + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr iter, + merged_reader_->NextBatch()); + if (iter == nullptr) { + // No more data + break; + } + + // Iterate through all KV objects in this batch + while (true) { + PAIMON_ASSIGN_OR_RAISE(bool has_next, iter->HasNext()); + if (!has_next) { + break; + } + + iter->Next(); + + // At this point: + // - kv has passed through SortMergeReader (deduplicated, merged) + // - kv has passed through DropDeleteReader (kind is guaranteed IsAdd()) + // - kv represents a final, valid, non-deleted row + count++; + } + } + + return count; +} + +void RowCountAccumulator::Close() { + if (merged_reader_) { + merged_reader_->Close(); + } +} + +} // namespace paimon diff --git a/src/paimon/core/mergetree/row_count_accumulator.h b/src/paimon/core/mergetree/row_count_accumulator.h new file mode 100644 index 000000000..aea41f68c --- /dev/null +++ b/src/paimon/core/mergetree/row_count_accumulator.h @@ -0,0 +1,47 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/mergetree/compact/sort_merge_reader.h" +#include "paimon/result.h" + +namespace paimon { + +/// Counts rows from a merged KeyValue stream after delete rows are dropped. +class RowCountAccumulator { + public: + /// @param merged_reader The merged reader. Must be wrapped with DropDeleteReader + /// so that only valid (non-deleted) KeyValue objects are output. + explicit RowCountAccumulator(std::unique_ptr&& merged_reader); + + ~RowCountAccumulator() = default; + + /// Count all valid rows from the merge reader. + /// Iterates through all merged+deduplicated+non-deleted KeyValue objects. + Result CountAll(); + + /// Close underlying readers and release resources. + void Close(); + + private: + std::unique_ptr merged_reader_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/abstract_split_read.cpp b/src/paimon/core/operation/abstract_split_read.cpp index 5098e1729..5433335c6 100644 --- a/src/paimon/core/operation/abstract_split_read.cpp +++ b/src/paimon/core/operation/abstract_split_read.cpp @@ -102,29 +102,6 @@ bool AbstractSplitRead::NeedCompleteRowTrackingFields( } return false; } - -std::unordered_map AbstractSplitRead::CreateDeletionFileMap( - const DataSplitImpl& data_split) { - return CreateDeletionFileMap(data_split.DataFiles(), data_split.DeletionFiles()); -} - -std::unordered_map AbstractSplitRead::CreateDeletionFileMap( - const std::vector>& data_files, - const std::vector>& deletion_files) { - std::unordered_map deletion_file_map; - if (deletion_files.empty()) { - return deletion_file_map; - } - assert(deletion_files.size() == data_files.size()); - size_t file_count = deletion_files.size(); - for (size_t i = 0; i < file_count; i++) { - if (deletion_files[i] != std::nullopt) { - deletion_file_map.emplace(data_files[i]->file_name, deletion_files[i].value()); - } - } - return deletion_file_map; -} - Result> AbstractSplitRead::ApplyPredicateFilterIfNeeded( std::unique_ptr&& reader, const std::shared_ptr& predicate) const { if (!context_->EnablePredicateFilter() || predicate == nullptr) { diff --git a/src/paimon/core/operation/abstract_split_read.h b/src/paimon/core/operation/abstract_split_read.h index 98bc0efd1..b2df729bd 100644 --- a/src/paimon/core/operation/abstract_split_read.h +++ b/src/paimon/core/operation/abstract_split_read.h @@ -19,7 +19,6 @@ #include #include #include -#include #include #include "arrow/type_fwd.h" @@ -30,7 +29,6 @@ #include "paimon/core/operation/split_read.h" #include "paimon/core/schema/schema_manager.h" #include "paimon/core/table/source/data_split_impl.h" -#include "paimon/core/table/source/deletion_file.h" #include "paimon/core/utils/file_store_path_factory.h" #include "paimon/format/reader_builder.h" #include "paimon/reader/batch_reader.h" @@ -73,14 +71,6 @@ class AbstractSplitRead : public SplitRead { std::unique_ptr&& schema_manager, const std::shared_ptr& memory_pool, const std::shared_ptr& executor); - - static std::unordered_map CreateDeletionFileMap( - const DataSplitImpl& data_split); - - static std::unordered_map CreateDeletionFileMap( - const std::vector>& data_files, - const std::vector>& deletion_files); - Result> ApplyPredicateFilterIfNeeded( std::unique_ptr&& reader, const std::shared_ptr& predicate) const; diff --git a/src/paimon/core/operation/internal_read_context.cpp b/src/paimon/core/operation/internal_read_context.cpp index 11d147c7d..ec74c24d6 100644 --- a/src/paimon/core/operation/internal_read_context.cpp +++ b/src/paimon/core/operation/internal_read_context.cpp @@ -116,4 +116,13 @@ InternalReadContext::InternalReadContext(const std::shared_ptr& rea read_schema_(read_schema), options_(options) {} +Result> InternalReadContext::CreateWithSchema( + const std::shared_ptr& original, + const std::shared_ptr& new_read_schema) { + // Create a new InternalReadContext sharing all properties except read_schema. + // The new read_schema is the minimal column set for COUNT(*). + return std::shared_ptr(new InternalReadContext( + original->read_context_, original->table_schema_, new_read_schema, original->options_)); +} + } // namespace paimon diff --git a/src/paimon/core/operation/internal_read_context.h b/src/paimon/core/operation/internal_read_context.h index 18db01e69..12b734a62 100644 --- a/src/paimon/core/operation/internal_read_context.h +++ b/src/paimon/core/operation/internal_read_context.h @@ -98,6 +98,14 @@ class InternalReadContext { return read_context_->GetCacheConfig(); } + /// Create a new InternalReadContext with a different read schema. + /// Useful for creating a context with a minimal column set for specialized reads. + /// All other settings (predicate, options, table_schema, etc.) are inherited + /// from the original context. + static Result> CreateWithSchema( + const std::shared_ptr& original, + const std::shared_ptr& new_read_schema); + private: InternalReadContext(const std::shared_ptr& read_context, const std::shared_ptr& table_schema, diff --git a/src/paimon/core/operation/merge_file_split_read.cpp b/src/paimon/core/operation/merge_file_split_read.cpp index 291b6f65b..e794a403d 100644 --- a/src/paimon/core/operation/merge_file_split_read.cpp +++ b/src/paimon/core/operation/merge_file_split_read.cpp @@ -233,8 +233,10 @@ Result> MergeFileSplitRead::ApplyIndexAndDvRead Result> MergeFileSplitRead::CreateMergeReader( const std::shared_ptr& data_split, const std::shared_ptr& data_file_path_factory) { - auto dv_factory = DeletionVector::CreateFactory(options_.GetFileSystem(), - CreateDeletionFileMap(*data_split), pool_); + auto dv_factory = DeletionVector::CreateFactory( + options_.GetFileSystem(), + DeletionVector::CreateDeletionFileMap(data_split->DataFiles(), data_split->DeletionFiles()), + pool_); std::vector> sections = IntervalPartition(data_split->DataFiles(), key_comparator_).Partition(); @@ -255,8 +257,10 @@ Result> MergeFileSplitRead::CreateMergeReader( Result> MergeFileSplitRead::CreateNoMergeReader( const std::shared_ptr& data_split, bool only_filter_key, const std::shared_ptr& data_file_path_factory) const { - auto dv_factory = DeletionVector::CreateFactory(options_.GetFileSystem(), - CreateDeletionFileMap(*data_split), pool_); + auto dv_factory = DeletionVector::CreateFactory( + options_.GetFileSystem(), + DeletionVector::CreateDeletionFileMap(data_split->DataFiles(), data_split->DeletionFiles()), + pool_); // create read schema without extra fields (e.g., completed key, sequence fields) auto row_kind_field = DataField::ConvertDataFieldToArrowField(SpecialFields::ValueKind()); diff --git a/src/paimon/core/operation/merge_file_split_read.h b/src/paimon/core/operation/merge_file_split_read.h index 40703d225..8dfca89d7 100644 --- a/src/paimon/core/operation/merge_file_split_read.h +++ b/src/paimon/core/operation/merge_file_split_read.h @@ -105,6 +105,11 @@ class MergeFileSplitRead : public AbstractSplitRead { return path_factory_; } + /// Get the key comparator (needed by count readers for IntervalPartition). + std::shared_ptr GetKeyComparator() const { + return key_comparator_; + } + std::shared_ptr GetValueSchema() const { return value_schema_; } diff --git a/src/paimon/core/operation/raw_file_split_read.cpp b/src/paimon/core/operation/raw_file_split_read.cpp index 802c09043..79755487e 100644 --- a/src/paimon/core/operation/raw_file_split_read.cpp +++ b/src/paimon/core/operation/raw_file_split_read.cpp @@ -96,7 +96,8 @@ Result> RawFileSplitRead::CreateReader( const std::vector>& data_files, const std::vector>& deletion_files) { auto dv_factory = DeletionVector::CreateFactory( - options_.GetFileSystem(), CreateDeletionFileMap(data_files, deletion_files), pool_); + options_.GetFileSystem(), DeletionVector::CreateDeletionFileMap(data_files, deletion_files), + pool_); return CreateReader(partition, bucket, data_files, dv_factory); } diff --git a/src/paimon/core/table/source/append_count_reader.cpp b/src/paimon/core/table/source/append_count_reader.cpp new file mode 100644 index 000000000..ce467af57 --- /dev/null +++ b/src/paimon/core/table/source/append_count_reader.cpp @@ -0,0 +1,78 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/source/append_count_reader.h" + +#include "paimon/core/deletionvectors/deletion_vector.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/status.h" + +namespace paimon { + +Result AppendCountReader::CountRows() { + int64_t total = 0; + for (const auto& split : splits_) { + PAIMON_ASSIGN_OR_RAISE(int64_t split_count, CountSingleSplit(split)); + total += split_count; + } + return total; +} + +Result AppendCountReader::CountSingleSplit(const std::shared_ptr& split) const { + auto data_split = std::dynamic_pointer_cast(split); + if (!data_split) { + return Status::Invalid("split cannot be cast to DataSplitImpl"); + } + + if (data_split->DataFiles().empty()) { + return 0; + } + + return MetadataCount(data_split); +} + +Result AppendCountReader::MetadataCount( + const std::shared_ptr& split) const { + if (split->RawConvertible()) { + if (!file_system_ || !pool_) { + return Status::Invalid( + "file_system or memory_pool is null for DV-based append count fallback"); + } + + DeletionVector::Factory dv_factory = DeletionVector::CreateFactory( + file_system_, + DeletionVector::CreateDeletionFileMap(split->DataFiles(), split->DeletionFiles()), + pool_); + + PAIMON_ASSIGN_OR_RAISE(std::optional merged_count, + split->MergedRowCount(dv_factory)); + if (merged_count.has_value()) { + return merged_count.value(); + } + } else { + // Non-raw-convertible splits are typically produced by data evolution when multiple files + // overlap on row-id ranges. Count them through data-evolution metadata instead of using a + // deletion-vector factory for raw file counts. + PAIMON_ASSIGN_OR_RAISE(std::optional merged_count, split->MergedRowCount()); + if (merged_count.has_value()) { + return merged_count.value(); + } + } + + return Status::Invalid("not support split in append count fallback"); +} + +} // namespace paimon diff --git a/src/paimon/core/table/source/append_count_reader.h b/src/paimon/core/table/source/append_count_reader.h new file mode 100644 index 000000000..d97fdf44a --- /dev/null +++ b/src/paimon/core/table/source/append_count_reader.h @@ -0,0 +1,52 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/fs/file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/reader/count_reader.h" +#include "paimon/result.h" + +namespace paimon { +class DataSplitImpl; +class Split; + +class AppendCountReader : public CountReader { + public: + explicit AppendCountReader(std::vector> splits, + const std::shared_ptr& file_system, + const std::shared_ptr& pool) + : splits_(std::move(splits)), file_system_(file_system), pool_(pool) {} + + Result CountRows() override; + + private: + Result CountSingleSplit(const std::shared_ptr& split) const; + Result MetadataCount(const std::shared_ptr& split) const; + + private: + std::vector> splits_; + std::shared_ptr file_system_; + std::shared_ptr pool_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/source/append_count_reader_test.cpp b/src/paimon/core/table/source/append_count_reader_test.cpp new file mode 100644 index 000000000..5a211fae6 --- /dev/null +++ b/src/paimon/core/table/source/append_count_reader_test.cpp @@ -0,0 +1,143 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/source/append_count_reader.h" + +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/defs.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/predicate/literal.h" +#include "paimon/predicate/predicate_builder.h" +#include "paimon/scan_context.h" +#include "paimon/status.h" +#include "paimon/table/source/plan.h" +#include "paimon/table/source/split.h" +#include "paimon/table/source/table_read.h" +#include "paimon/table/source/table_scan.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +namespace { + +class DummySplit final : public Split {}; + +} // namespace + +class AppendCountReaderTest : public testing::Test { + protected: + Result>> CreateSplits( + const std::string& table_path, const std::optional& snapshot_id) { + ScanContextBuilder scan_context_builder(table_path); + if (snapshot_id.has_value()) { + scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, + std::to_string(snapshot_id.value())); + } + + PAIMON_ASSIGN_OR_RAISE(auto scan_context, scan_context_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(auto table_scan, TableScan::Create(std::move(scan_context))); + PAIMON_ASSIGN_OR_RAISE(auto plan, table_scan->CreatePlan()); + if (snapshot_id.has_value() && + (!plan->SnapshotId().has_value() || plan->SnapshotId().value() != snapshot_id)) { + return Status::Invalid("snapshot id mismatch"); + } + return plan->Splits(); + } + + std::shared_ptr file_system_ = std::make_shared(); + std::shared_ptr pool_ = GetDefaultPool(); +}; + +TEST_F(AppendCountReaderTest, TestCountRowsSnapshot1) { + std::string table_path = GetDataDir() + "/orc/append_09.db/append_09"; + + ASSERT_OK_AND_ASSIGN(auto splits, CreateSplits(table_path, /*snapshot_id=*/1)); + AppendCountReader count_reader(splits, file_system_, pool_); + + ASSERT_OK_AND_ASSIGN(int64_t count, count_reader.CountRows()); + ASSERT_EQ(count, 5); +} + +TEST_F(AppendCountReaderTest, TestCountRowsSnapshot5) { + std::string table_path = GetDataDir() + "/orc/append_09.db/append_09"; + + ASSERT_OK_AND_ASSIGN(auto splits, CreateSplits(table_path, /*snapshot_id=*/5)); + AppendCountReader count_reader(splits, file_system_, pool_); + + ASSERT_OK_AND_ASSIGN(int64_t count, count_reader.CountRows()); + ASSERT_EQ(count, 11); +} + +TEST_F(AppendCountReaderTest, TestCountRowsDataEvolutionTable) { + std::string table_path = + GetDataDir() + "/orc/data_evolution_with_dense_stats.db/data_evolution_with_dense_stats"; + + ASSERT_OK_AND_ASSIGN(auto splits, CreateSplits(table_path, /*snapshot_id=*/2)); + ASSERT_FALSE(splits.empty()); + + bool has_non_raw_convertible_split = false; + for (const auto& split : splits) { + auto data_split = std::dynamic_pointer_cast(split); + ASSERT_TRUE(data_split); + has_non_raw_convertible_split |= !data_split->RawConvertible(); + } + ASSERT_TRUE(has_non_raw_convertible_split); + + AppendCountReader count_reader(splits, file_system_, pool_); + + ASSERT_OK_AND_ASSIGN(int64_t count, count_reader.CountRows()); + ASSERT_EQ(count, 2); +} + +TEST_F(AppendCountReaderTest, TestCountRowsWithEmptySplits) { + std::vector> empty_splits; + AppendCountReader count_reader(empty_splits, file_system_, pool_); + + ASSERT_OK_AND_ASSIGN(int64_t count, count_reader.CountRows()); + ASSERT_EQ(count, 0); +} + +TEST_F(AppendCountReaderTest, TestCountRowsWithInvalidSplit) { + std::vector> splits = {std::make_shared()}; + AppendCountReader count_reader(splits, file_system_, pool_); + + ASSERT_NOK_WITH_MSG(count_reader.CountRows(), "split cannot be cast to DataSplitImpl"); +} + +TEST_F(AppendCountReaderTest, TestAppendOnlyTableReadCreateCountReaderPredicateNotSupported) { + std::string table_path = GetDataDir() + "/orc/append_09.db/append_09"; + + auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/3, /*field_name=*/"f3", + FieldType::DOUBLE, Literal(13.0)); + + ReadContextBuilder read_context_builder(table_path); + read_context_builder.SetPredicate(predicate); + ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + + ASSERT_OK_AND_ASSIGN(auto splits, CreateSplits(table_path, /*snapshot_id=*/5)); + ASSERT_NOK_WITH_MSG(table_read->CreateCountReader(splits), + "predicate pushdown is not supported"); +} + +} // namespace paimon::test diff --git a/src/paimon/core/table/source/append_only_table_read.cpp b/src/paimon/core/table/source/append_only_table_read.cpp index 03bbf15ba..91665d892 100644 --- a/src/paimon/core/table/source/append_only_table_read.cpp +++ b/src/paimon/core/table/source/append_only_table_read.cpp @@ -16,10 +16,13 @@ #include "paimon/core/table/source/append_only_table_read.h" +#include + #include "paimon/core/core_options.h" #include "paimon/core/operation/data_evolution_split_read.h" #include "paimon/core/operation/internal_read_context.h" #include "paimon/core/operation/raw_file_split_read.h" +#include "paimon/core/table/source/append_count_reader.h" #include "paimon/status.h" namespace paimon { @@ -32,7 +35,7 @@ AppendOnlyTableRead::AppendOnlyTableRead(const std::shared_ptr& context, const std::shared_ptr& memory_pool, const std::shared_ptr& executor) - : TableRead(memory_pool) { + : TableRead(memory_pool), context_(context) { const auto& core_options = context->GetCoreOptions(); if (core_options.DataEvolutionEnabled()) { // add data evolution first @@ -55,4 +58,15 @@ Result> AppendOnlyTableRead::CreateReader( return Status::Invalid("create reader failed, not read match with split."); } +Result> AppendOnlyTableRead::CreateCountReader( + const std::vector>& splits) { + if (context_->GetPredicate() != nullptr) { + return Status::NotImplemented( + "CreateCountReader with predicate pushdown is not supported yet"); + } + + return std::make_unique(splits, context_->GetCoreOptions().GetFileSystem(), + GetMemoryPool()); +} + } // namespace paimon diff --git a/src/paimon/core/table/source/append_only_table_read.h b/src/paimon/core/table/source/append_only_table_read.h index fc1282fff..951786deb 100644 --- a/src/paimon/core/table/source/append_only_table_read.h +++ b/src/paimon/core/table/source/append_only_table_read.h @@ -44,8 +44,12 @@ class AppendOnlyTableRead : public TableRead { Result> CreateReader( const std::shared_ptr& data_split) override; + Result> CreateCountReader( + const std::vector>& splits) override; + private: std::vector> split_reads_; + std::shared_ptr context_; }; } // namespace paimon diff --git a/src/paimon/core/table/source/data_split_impl.cpp b/src/paimon/core/table/source/data_split_impl.cpp index 1cc91861a..ca274fd77 100644 --- a/src/paimon/core/table/source/data_split_impl.cpp +++ b/src/paimon/core/table/source/data_split_impl.cpp @@ -21,6 +21,9 @@ #include #include +#include "paimon/common/utils/range_helper.h" +#include "paimon/core/deletionvectors/deletion_vector.h" + namespace paimon { bool DataSplit::SimpleDataFileMeta::operator==(const SimpleDataFileMeta& other) const { @@ -109,18 +112,83 @@ bool DataSplitImpl::TEST_Equal(const DataSplitImpl& other) const { is_streaming_ == other.is_streaming_ && raw_convertible_ == other.raw_convertible_; } -int64_t DataSplitImpl::PartialMergedRowCount() const { +Result> DataSplitImpl::MergedRowCount() const { + return MergedRowCount(nullptr); +} + +Result> DataSplitImpl::MergedRowCount( + DeletionVector::Factory dv_factory) const { + PAIMON_ASSIGN_OR_RAISE(std::optional raw_merged_row_count, + RawMergedRowCount(dv_factory)); + if (raw_merged_row_count.has_value()) { + return raw_merged_row_count; + } + if (DataEvolutionRowCountAvailable()) { + PAIMON_ASSIGN_OR_RAISE(int64_t merged_row_count, DataEvolutionMergedRowCount()); + return std::optional(merged_row_count); + } + return std::optional(); +} + +Result> DataSplitImpl::RawMergedRowCount( + DeletionVector::Factory dv_factory) const { if (!raw_convertible_) { - return 0; + return std::optional(); } + int64_t sum = 0; for (size_t i = 0; i < data_files_.size(); i++) { const auto& data_file = data_files_[i]; - if (data_deletion_files_.empty() || data_deletion_files_[i] == std::nullopt) { + const std::optional deletion_file = + data_deletion_files_.empty() ? std::nullopt : data_deletion_files_[i]; + if (deletion_file == std::nullopt) { sum += data_file->row_count; - } else if (data_deletion_files_[i].value().cardinality != std::nullopt) { - sum += data_file->row_count - data_deletion_files_[i].value().cardinality.value(); + } else if (deletion_file.value().cardinality != std::nullopt) { + sum += data_file->row_count - deletion_file.value().cardinality.value(); + } else { + if (!dv_factory) { + return std::optional(); + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr deletion_vector, + dv_factory(data_file->file_name)); + if (!deletion_vector) { + return Status::Invalid( + "deletion vector not found for file with missing cardinality"); + } + sum += data_file->row_count - deletion_vector->GetCardinality(); + } + } + + return std::optional(sum); +} + +bool DataSplitImpl::DataEvolutionRowCountAvailable() const { + return std::all_of(data_files_.begin(), data_files_.end(), + [](const std::shared_ptr& file) { + return file->first_row_id != std::nullopt; + }); +} + +Result DataSplitImpl::DataEvolutionMergedRowCount() const { + std::vector> files = data_files_; + RangeHelper> range_helper( + [](const std::shared_ptr& meta) -> Result { + return meta->first_row_id.value(); + }, + [](const std::shared_ptr& meta) -> Result { + return meta->first_row_id.value() + meta->row_count - 1; + }); + + PAIMON_ASSIGN_OR_RAISE(std::vector>> ranges, + range_helper.MergeOverlappingRanges(std::move(files))); + + int64_t sum = 0; + for (const auto& group : ranges) { + int64_t max_count = 0; + for (const auto& file : group) { + max_count = std::max(max_count, file->row_count); } + sum += max_count; } return sum; } diff --git a/src/paimon/core/table/source/data_split_impl.h b/src/paimon/core/table/source/data_split_impl.h index e0b54af1a..46317f382 100644 --- a/src/paimon/core/table/source/data_split_impl.h +++ b/src/paimon/core/table/source/data_split_impl.h @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include #include @@ -25,6 +26,7 @@ #include "paimon/common/utils/path_util.h" #include "paimon/common/utils/preconditions.h" #include "paimon/common/utils/string_utils.h" +#include "paimon/core/deletionvectors/deletion_vector.h" #include "paimon/core/io/data_file_meta_09_serializer.h" #include "paimon/core/io/data_file_meta_10_serializer.h" #include "paimon/core/io/data_file_meta_12_serializer.h" @@ -34,6 +36,7 @@ #include "paimon/table/source/data_split.h" namespace paimon { + /// Input splits. Needed by most batch computation engines. class DataSplitImpl : public DataSplit { public: @@ -93,14 +96,22 @@ class DataSplitImpl : public DataSplit { bool operator==(const DataSplitImpl& other) const; bool TEST_Equal(const DataSplitImpl& other) const; - /// Obtain merged row count as much as possible. There are two scenarios where accurate row - /// count - /// can be calculated: + /// Obtain merged row count when metadata is sufficient. + /// + /// This method follows Java DataSplit#mergedRowCount behavior: /// - /// 1. raw file and no deletion file. + /// 1. Prefer raw merged row count when split is raw-convertible and deletion cardinality is + /// available. + /// 2. Fallback to data-evolution merged row count when all files have first_row_id. /// - /// 2. raw file + deletion file with cardinality. - int64_t PartialMergedRowCount() const; + /// Obtain merged row count without reading deletion vector files for missing cardinality. + Result> MergedRowCount() const; + + /// Obtain merged row count with a deletion vector factory for missing cardinality. + /// + /// When a deletion file exists but its cardinality metadata is missing, the factory can be + /// used to read the deletion vector file and provide exact cardinality. + Result> MergedRowCount(DeletionVector::Factory dv_factory) const; // Builder /// Builder for `DataSplitImpl`. @@ -174,6 +185,10 @@ class DataSplitImpl : public DataSplit { bucket_path_(bucket_path), data_files_(std::move(data_files)) {} + Result> RawMergedRowCount(DeletionVector::Factory dv_factory) const; + bool DataEvolutionRowCountAvailable() const; + Result DataEvolutionMergedRowCount() const; + private: int64_t snapshot_id_ = 0; BinaryRow partition_ = BinaryRow::EmptyRow(); diff --git a/src/paimon/core/table/source/data_split_test.cpp b/src/paimon/core/table/source/data_split_test.cpp index 6f4d0645d..e34ee26bf 100644 --- a/src/paimon/core/table/source/data_split_test.cpp +++ b/src/paimon/core/table/source/data_split_test.cpp @@ -25,6 +25,8 @@ #include "gtest/gtest.h" #include "paimon/common/data/binary_row.h" #include "paimon/common/data/data_define.h" +#include "paimon/core/deletionvectors/bitmap_deletion_vector.h" +#include "paimon/core/deletionvectors/deletion_vector.h" #include "paimon/core/io/data_file_meta.h" #include "paimon/core/manifest/file_source.h" #include "paimon/core/stats/simple_stats.h" @@ -412,7 +414,9 @@ TEST(DataSplitTest, TestDeserializeVersion5PkWithEmptyExternalPath) { builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build().value()); ASSERT_EQ(*result_data_split, *expected_data_split); - ASSERT_EQ(5, expected_data_split->PartialMergedRowCount()); + ASSERT_OK_AND_ASSIGN(std::optional merged_row_count, + expected_data_split->MergedRowCount()); + ASSERT_EQ(std::optional(5), merged_row_count); ASSERT_OK(Split::Serialize(result_data_split, pool)); } @@ -474,7 +478,9 @@ TEST(DataSplitTest, TestDeserializeVersion4PkWithSnapshot4WithDvCardinality) { .value()); ASSERT_EQ(*result_data_split, *expected_data_split); - ASSERT_EQ(5, expected_data_split->PartialMergedRowCount()); + ASSERT_OK_AND_ASSIGN(std::optional merged_row_count, + expected_data_split->MergedRowCount()); + ASSERT_EQ(std::optional(5), merged_row_count); } TEST(DataSplitTest, TestDeserializeVersion3AppendWithSnapshot1) { @@ -813,7 +819,9 @@ TEST(DataSplitTest, TestDeserializePkWithSnapshot6OfSingleFile) { .Build() .value()); ASSERT_EQ(*result_data_split, *expected_data_split); - ASSERT_EQ(0, expected_data_split->PartialMergedRowCount()); + ASSERT_OK_AND_ASSIGN(std::optional merged_row_count, + expected_data_split->MergedRowCount()); + ASSERT_EQ(std::nullopt, merged_row_count); } TEST(DataSplitTest, TestDeserializePkWithSnapshot6OfMultiFiles) { @@ -1048,16 +1056,8 @@ TEST(DataSplitTest, TestDeserializePk10WithSnapshot6) { TEST(DataSplitTest, TestPartialMergedRowCount) { auto pool = GetDefaultPool(); auto file_meta = std::make_shared( - "data-0.orc", /*file_size=*/100, /*row_count=*/2, - /*min_key=*/ - BinaryRowGenerator::GenerateRow({std::string("Alice"), 1}, pool.get()), /*max_key=*/ - BinaryRowGenerator::GenerateRow({std::string("David"), 1}, pool.get()), - /*key_stats=*/ - BinaryRowGenerator::GenerateStats({std::string("Alice"), 1}, {std::string("David"), 1}, - {0, 0}, pool.get()), /*value_stats=*/ - BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 1, 11.0}, - {std::string("David"), 10, 1, 11.1}, {0, 0, 0, 0}, - pool.get()), + "data-0.orc", /*file_size=*/100, /*row_count=*/2, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), /*min_sequence_number=*/0, /*max_sequence_number=*/1, /*schema_id=*/0, /*level=*/0, /*extra_files=*/std::vector>(), /*creation_time=*/Timestamp(1725562946338ll, 0), @@ -1066,16 +1066,8 @@ TEST(DataSplitTest, TestPartialMergedRowCount) { /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); auto file_meta2 = std::make_shared( - "data-1.orc", /*file_size=*/100, /*row_count=*/2, - /*min_key=*/ - BinaryRowGenerator::GenerateRow({std::string("Bob"), 1}, pool.get()), /*max_key=*/ - BinaryRowGenerator::GenerateRow({std::string("David"), 1}, pool.get()), - /*key_stats=*/ - BinaryRowGenerator::GenerateStats({std::string("Bob"), 1}, {std::string("David"), 1}, - {0, 0}, pool.get()), /*value_stats=*/ - BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 1, 11.0}, - {std::string("David"), 10, 1, 11.1}, {0, 0, 0, 0}, - pool.get()), + "data-1.orc", /*file_size=*/100, /*row_count=*/2, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), /*min_sequence_number=*/2, /*max_sequence_number=*/3, /*schema_id=*/0, /*level=*/0, /*extra_files=*/std::vector>(), /*creation_time=*/Timestamp(1725562947338ll, 0), @@ -1095,7 +1087,9 @@ TEST(DataSplitTest, TestPartialMergedRowCount) { .RawConvertible(false) .Build() .value()); - ASSERT_EQ(0, expected_data_split->PartialMergedRowCount()); + ASSERT_OK_AND_ASSIGN(std::optional merged_row_count, + expected_data_split->MergedRowCount()); + ASSERT_EQ(std::nullopt, merged_row_count); ASSERT_EQ(4, expected_data_split->RowCount()); ASSERT_OK_AND_ASSIGN(auto latest_epoch, expected_data_split->LatestFileCreationEpochMillis()); @@ -1103,6 +1097,299 @@ TEST(DataSplitTest, TestPartialMergedRowCount) { ASSERT_EQ(file_meta2->CreationTimeEpochMillis().value(), latest_epoch.value()); } +TEST(DataSplitTest, TestPartialMergedRowCountRawConvertibleWithoutDeletionFiles) { + auto pool = GetDefaultPool(); + auto file_meta1 = std::make_shared( + "data-0.orc", /*file_size=*/100, /*row_count=*/3, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562946338ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + auto file_meta2 = std::make_shared( + "data-1.orc", /*file_size=*/100, /*row_count=*/4, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/3, /*max_sequence_number=*/6, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562947338ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder( + /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/0, /*bucket_path=*/"fake_table/f1=10/bucket-0", {file_meta1, file_meta2}); + + auto data_split = std::dynamic_pointer_cast( + builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build().value()); + + // Java parity: rawConvertible and deletion files absent should return row_count sum. + ASSERT_OK_AND_ASSIGN(std::optional merged_row_count, data_split->MergedRowCount()); + ASSERT_EQ(std::optional(7), merged_row_count); +} + +TEST(DataSplitTest, TestPartialMergedRowCountRawConvertibleWithCardinality) { + auto pool = GetDefaultPool(); + auto file_meta1 = std::make_shared( + "data-0.orc", /*file_size=*/100, /*row_count=*/7, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/6, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562946338ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + auto file_meta2 = std::make_shared( + "data-1.orc", /*file_size=*/100, /*row_count=*/2, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/7, /*max_sequence_number=*/8, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562947338ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + auto file_meta3 = std::make_shared( + "data-2.orc", /*file_size=*/100, /*row_count=*/3, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/9, /*max_sequence_number=*/11, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562948338ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder( + /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/0, /*bucket_path=*/"fake_table/f1=10/bucket-0", + {file_meta1, file_meta2, file_meta3}); + + auto data_split = std::dynamic_pointer_cast( + builder.WithSnapshot(1) + .WithDataDeletionFiles({std::nullopt, + DeletionFile("fake/index-0", /*offset=*/1, /*length=*/22, + /*cardinality=*/2), + DeletionFile("fake/index-0", /*offset=*/31, /*length=*/22, + /*cardinality=*/1)}) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + // Java parity: null + cardinality + cardinality => (7 + (2 - 2) + (3 - 1)) = 9. + ASSERT_OK_AND_ASSIGN(std::optional merged_row_count, data_split->MergedRowCount()); + ASSERT_EQ(std::optional(9), merged_row_count); +} + +TEST(DataSplitTest, TestPartialMergedRowCountMixedCardinalityReturnsNullopt) { + auto pool = GetDefaultPool(); + auto file_meta1 = std::make_shared( + "data-0.orc", /*file_size=*/100, /*row_count=*/7, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/6, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562946338ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + auto file_meta2 = std::make_shared( + "data-1.orc", /*file_size=*/100, /*row_count=*/2, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/7, /*max_sequence_number=*/8, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562947338ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder( + /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/0, /*bucket_path=*/"fake_table/f1=10/bucket-0", {file_meta1, file_meta2}); + + auto data_split = std::dynamic_pointer_cast( + builder.WithSnapshot(1) + .WithDataDeletionFiles({DeletionFile("fake/index-0", /*offset=*/1, /*length=*/22, + /*cardinality=*/2), + DeletionFile("fake/index-0", /*offset=*/31, /*length=*/22, + /*cardinality=*/std::nullopt)}) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + // If any deletion file misses cardinality, metadata count is unknown and must not be partial. + ASSERT_OK_AND_ASSIGN(std::optional merged_row_count, data_split->MergedRowCount()); + ASSERT_EQ(std::nullopt, merged_row_count); +} + +TEST(DataSplitTest, TestPartialMergedRowCountUnknownDeleteRowCountDoesNotBlockRawCount) { + auto pool = GetDefaultPool(); + auto file_meta1 = std::make_shared( + "data-0.orc", /*file_size=*/100, /*row_count=*/3, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562946338ll, 0), + /*delete_row_count=*/std::nullopt, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + auto file_meta2 = std::make_shared( + "data-1.orc", /*file_size=*/100, /*row_count=*/2, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/3, /*max_sequence_number=*/4, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562947338ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder( + /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/0, /*bucket_path=*/"fake_table/f1=10/bucket-0", {file_meta1, file_meta2}); + + auto data_split = std::dynamic_pointer_cast( + builder.WithSnapshot(1) + .WithDataDeletionFiles({std::nullopt, std::nullopt}) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + ASSERT_OK_AND_ASSIGN(std::optional merged_row_count, data_split->MergedRowCount()); + ASSERT_EQ(std::optional(5), merged_row_count); +} + +TEST(DataSplitTest, TestPartialMergedRowCountFallsBackToDataEvolution) { + auto pool = GetDefaultPool(); + auto file_meta1 = std::make_shared( + "data-0.orc", /*file_size=*/100, /*row_count=*/3, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562946338ll, 0), + /*delete_row_count=*/std::nullopt, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/100, + /*write_cols=*/std::nullopt); + auto file_meta2 = std::make_shared( + "data-1.orc", /*file_size=*/100, /*row_count=*/5, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/3, /*max_sequence_number=*/7, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562947338ll, 0), + /*delete_row_count=*/std::nullopt, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/100, + /*write_cols=*/std::nullopt); + auto file_meta3 = std::make_shared( + "data-2.orc", /*file_size=*/100, /*row_count=*/2, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/8, /*max_sequence_number=*/9, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562948338ll, 0), + /*delete_row_count=*/std::nullopt, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/200, + /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder( + /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/0, /*bucket_path=*/"fake_table/f1=10/bucket-0", + {file_meta1, file_meta2, file_meta3}); + + auto data_split = std::dynamic_pointer_cast( + builder.WithSnapshot(1) + .WithDataDeletionFiles({std::nullopt, std::nullopt, std::nullopt}) + .IsStreaming(false) + .RawConvertible(false) + .Build() + .value()); + + // [100,102] and [100,104] overlap -> max row_count is 5; [200,201] is separate -> 2. + ASSERT_OK_AND_ASSIGN(std::optional merged_row_count, data_split->MergedRowCount()); + ASSERT_EQ(std::optional(7), merged_row_count); +} + +// Covers the refactored path where a deletion file exists but its cardinality metadata is +// missing (nullopt). In that case MergedRowCount must call the provided dv_factory to read the +// deletion vector and derive the exact cardinality, instead of returning nullopt. +TEST(DataSplitTest, TestPartialMergedRowCountResolvesMissingCardinalityViaFactory) { + auto pool = GetDefaultPool(); + auto file_meta1 = std::make_shared( + "data-0.orc", /*file_size=*/100, /*row_count=*/7, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/0, /*max_sequence_number=*/6, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562946338ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + auto file_meta2 = std::make_shared( + "data-1.orc", /*file_size=*/100, /*row_count=*/5, DataFileMeta::EmptyMinKey(), + DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), + /*min_sequence_number=*/7, /*max_sequence_number=*/11, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1725562947338ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder( + /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/0, /*bucket_path=*/"fake_table/f1=10/bucket-0", {file_meta1, file_meta2}); + + // The second data file has a deletion file whose cardinality metadata is missing (nullopt), + // which forces resolution through the dv_factory. + auto data_split = std::dynamic_pointer_cast( + builder.WithSnapshot(1) + .WithDataDeletionFiles({std::nullopt, DeletionFile("fake/index-0", /*offset=*/0, + /*length=*/22, /*cardinality=*/ + std::nullopt)}) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + // Without a factory, the missing cardinality keeps the result unknown. + ASSERT_OK_AND_ASSIGN(std::optional merged_without_factory, + data_split->MergedRowCount()); + ASSERT_EQ(std::nullopt, merged_without_factory); + + // Build a deletion vector with 2 deleted rows; the factory returns it for the data file whose + // cardinality is missing. + RoaringBitmap32 roaring; + roaring.Add(1); + roaring.Add(3); + auto deletion_vector = std::make_shared(roaring); + ASSERT_EQ(2, deletion_vector->GetCardinality()); + + DeletionVector::Factory dv_factory = + [&deletion_vector]( + const std::string& file_name) -> Result> { + if (file_name == "data-1.orc") { + return std::static_pointer_cast(deletion_vector); + } + return std::shared_ptr(); + }; + + // file_meta1: 7 (no deletion file) + file_meta2: 5 - factory_cardinality(2) = 10. + ASSERT_OK_AND_ASSIGN(std::optional merged_with_factory, + data_split->MergedRowCount(dv_factory)); + ASSERT_EQ(std::optional(10), merged_with_factory); +} + TEST(DataSplitTest, TestRowCountAndLatestFileCreationEpochMillisEmpty) { DataSplitImpl::Builder builder( /*partition=*/BinaryRow::EmptyRow(), diff --git a/src/paimon/core/table/source/data_table_batch_scan.cpp b/src/paimon/core/table/source/data_table_batch_scan.cpp index 80e1eb58a..0a441f63b 100644 --- a/src/paimon/core/table/source/data_table_batch_scan.cpp +++ b/src/paimon/core/table/source/data_table_batch_scan.cpp @@ -80,9 +80,14 @@ Result> DataTableBatchScan::ApplyPushDownLimit( return Status::Invalid("DataSplit cannot cast to DataSplitImpl"); } if (data_split->RawConvertible()) { - int64_t partial_merged_row_count = data_split->PartialMergedRowCount(); + PAIMON_ASSIGN_OR_RAISE(std::optional partial_merged_row_count, + data_split->MergedRowCount()); + if (!partial_merged_row_count.has_value()) { + // Cannot safely estimate split rows from metadata; skip push-down limit. + return current_scan_result->GetPlan(); + } limited_data_splits.emplace_back(data_split); - scanned_row_count += partial_merged_row_count; + scanned_row_count += partial_merged_row_count.value(); if (scanned_row_count >= push_down_limit_.value()) { PAIMON_ASSIGN_OR_RAISE(int64_t snapshot_id, current_scan_result->SnapshotId()); return std::make_shared(snapshot_id, limited_data_splits); diff --git a/src/paimon/core/table/source/key_value_table_read.cpp b/src/paimon/core/table/source/key_value_table_read.cpp index a6c53e5fb..2d14638f6 100644 --- a/src/paimon/core/table/source/key_value_table_read.cpp +++ b/src/paimon/core/table/source/key_value_table_read.cpp @@ -20,6 +20,8 @@ #include "paimon/core/operation/merge_file_split_read.h" #include "paimon/core/operation/raw_file_split_read.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/core/table/source/pk_count_reader.h" #include "paimon/status.h" namespace paimon { @@ -30,8 +32,15 @@ class InternalReadContext; class MemoryPool; KeyValueTableRead::KeyValueTableRead(std::vector>&& split_reads, - const std::shared_ptr& memory_pool) - : TableRead(memory_pool), split_reads_(std::move(split_reads)) {} + const std::shared_ptr& path_factory, + const std::shared_ptr& context, + const std::shared_ptr& memory_pool, + const std::shared_ptr& executor) + : TableRead(memory_pool), + split_reads_(std::move(split_reads)), + path_factory_(path_factory), + context_(context), + executor_(executor) {} Result> KeyValueTableRead::Create( const std::shared_ptr& path_factory, @@ -46,7 +55,8 @@ Result> KeyValueTableRead::Create( MergeFileSplitRead::Create(path_factory, context, memory_pool, executor)); split_reads.emplace_back(std::move(merge_file_split_read)); - return std::unique_ptr(new KeyValueTableRead(std::move(split_reads), memory_pool)); + return std::unique_ptr(new KeyValueTableRead(std::move(split_reads), path_factory, + context, memory_pool, executor)); } void KeyValueTableRead::ForceKeepDelete(bool force_keep_delete) { @@ -74,4 +84,22 @@ Result> KeyValueTableRead::CreateReader( return Status::Invalid("create reader failed, not read match with data split."); } +Result> KeyValueTableRead::CreateCountReader( + const std::vector>& splits) { + if (context_->GetPredicate() != nullptr) { + return Status::NotImplemented( + "CreateCountReader with predicate pushdown is not supported yet"); + } + + if (force_keep_delete_) { + return Status::NotImplemented("CreateCountReader with force_keep_delete is not supported"); + } + + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr pk_count_reader, + PKCountReader::Create(splits, path_factory_, context_, GetMemoryPool(), executor_)); + + return pk_count_reader; +} + } // namespace paimon diff --git a/src/paimon/core/table/source/key_value_table_read.h b/src/paimon/core/table/source/key_value_table_read.h index a34486bae..00abd0934 100644 --- a/src/paimon/core/table/source/key_value_table_read.h +++ b/src/paimon/core/table/source/key_value_table_read.h @@ -42,13 +42,22 @@ class KeyValueTableRead : public TableRead { Result> CreateReader(const std::shared_ptr& split) override; + Result> CreateCountReader( + const std::vector>& splits) override; + void ForceKeepDelete(bool force_keep_delete); private: KeyValueTableRead(std::vector>&& split_reads, - const std::shared_ptr& memory_pool); + const std::shared_ptr& path_factory, + const std::shared_ptr& context, + const std::shared_ptr& memory_pool, + const std::shared_ptr& executor); std::vector> split_reads_; + std::shared_ptr path_factory_; + std::shared_ptr context_; + std::shared_ptr executor_; bool force_keep_delete_ = false; }; diff --git a/src/paimon/core/table/source/pk_count_reader.cpp b/src/paimon/core/table/source/pk_count_reader.cpp new file mode 100644 index 000000000..5c40af308 --- /dev/null +++ b/src/paimon/core/table/source/pk_count_reader.cpp @@ -0,0 +1,140 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/source/pk_count_reader.h" + +#include +#include +#include + +#include "arrow/c/abi.h" +#include "arrow/type.h" +#include "paimon/common/types/data_field.h" +#include "paimon/core/deletionvectors/deletion_vector.h" +#include "paimon/core/mergetree/compact/interval_partition.h" +#include "paimon/core/mergetree/row_count_accumulator.h" +#include "paimon/core/operation/internal_read_context.h" +#include "paimon/core/operation/merge_file_split_read.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/status.h" + +namespace paimon { + +PKCountReader::~PKCountReader() = default; + +Result> PKCountReader::Create( + std::vector> splits, + const std::shared_ptr& path_factory, + const std::shared_ptr& context, + const std::shared_ptr& memory_pool, const std::shared_ptr& executor) { + const auto& table_schema = context->GetTableSchema(); + PAIMON_ASSIGN_OR_RAISE(std::vector pk_fields, + table_schema->TrimmedPrimaryKeyFields()); + std::shared_ptr count_read_schema = + DataField::ConvertDataFieldsToArrowSchema(pk_fields); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr count_context, + InternalReadContext::CreateWithSchema(context, count_read_schema)); + + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr merge_read, + MergeFileSplitRead::Create(path_factory, count_context, memory_pool, executor)); + + return std::unique_ptr( + new PKCountReader(std::move(splits), count_context, std::move(merge_read), memory_pool)); +} + +Result PKCountReader::CountRows() { + int64_t total = 0; + for (const auto& split : splits_) { + PAIMON_ASSIGN_OR_RAISE(int64_t split_count, CountSingleSplit(split)); + total += split_count; + } + return total; +} + +PKCountReader::PKCountReader(std::vector> splits, + const std::shared_ptr& context, + std::unique_ptr&& merge_read, + const std::shared_ptr& memory_pool) + : splits_(std::move(splits)), + context_(context), + merge_read_(std::move(merge_read)), + pool_(memory_pool) {} + +Result PKCountReader::CountSingleSplit(const std::shared_ptr& split) { + auto data_split = std::dynamic_pointer_cast(split); + if (!data_split) { + return Status::Invalid("split cannot be cast to DataSplitImpl"); + } + + if (data_split->DataFiles().empty()) { + return 0; + } + + if (data_split->RawConvertible()) { + return MetadataCount(data_split); + } + + return MergeCount(data_split); +} + +Result PKCountReader::MetadataCount(const std::shared_ptr& split) { + DeletionVector::Factory dv_factory = DeletionVector::CreateFactory( + context_->GetCoreOptions().GetFileSystem(), + DeletionVector::CreateDeletionFileMap(split->DataFiles(), split->DeletionFiles()), pool_); + + PAIMON_ASSIGN_OR_RAISE(std::optional count, split->MergedRowCount(dv_factory)); + if (count.has_value()) { + return count.value(); + } + + return Status::Invalid("not support split in pk count metadata fallback"); +} + +Result PKCountReader::MergeCount(const std::shared_ptr& split) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_file_path_factory, + merge_read_->GetPathFactory()->CreateDataFilePathFactory( + split->Partition(), split->Bucket())); + + auto dv_factory = DeletionVector::CreateFactory( + context_->GetCoreOptions().GetFileSystem(), + DeletionVector::CreateDeletionFileMap(split->DataFiles(), split->DeletionFiles()), pool_); + + std::vector> sections = + IntervalPartition(split->DataFiles(), merge_read_->GetKeyComparator()).Partition(); + + int64_t total_count = 0; + + for (const auto& section : sections) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr merged_reader, + merge_read_->CreateSortMergeReaderForSection( + section, split->Partition(), dv_factory, + /*predicate=*/nullptr, data_file_path_factory, + /*drop_delete=*/true)); + + RowCountAccumulator accumulator(std::move(merged_reader)); + PAIMON_ASSIGN_OR_RAISE(int64_t section_count, accumulator.CountAll()); + total_count += section_count; + accumulator.Close(); + } + + return total_count; +} + +} // namespace paimon diff --git a/src/paimon/core/table/source/pk_count_reader.h b/src/paimon/core/table/source/pk_count_reader.h new file mode 100644 index 000000000..c81d23a8c --- /dev/null +++ b/src/paimon/core/table/source/pk_count_reader.h @@ -0,0 +1,68 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/reader/count_reader.h" +#include "paimon/result.h" + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { +class DataSplitImpl; +class Executor; +class FileStorePathFactory; +class InternalReadContext; +class MemoryPool; +class MergeFileSplitRead; +class Split; + +class PKCountReader : public CountReader { + public: + ~PKCountReader() override; + + static Result> Create( + std::vector> splits, + const std::shared_ptr& path_factory, + const std::shared_ptr& context, + const std::shared_ptr& memory_pool, const std::shared_ptr& executor); + + Result CountRows() override; + + private: + PKCountReader(std::vector> splits, + const std::shared_ptr& context, + std::unique_ptr&& merge_read, + const std::shared_ptr& memory_pool); + + Result CountSingleSplit(const std::shared_ptr& split); + Result MetadataCount(const std::shared_ptr& split); + Result MergeCount(const std::shared_ptr& split); + + private: + std::vector> splits_; + std::shared_ptr context_; + std::unique_ptr merge_read_; + std::shared_ptr pool_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/source/pk_count_reader_test.cpp b/src/paimon/core/table/source/pk_count_reader_test.cpp new file mode 100644 index 000000000..bcdaaaf1f --- /dev/null +++ b/src/paimon/core/table/source/pk_count_reader_test.cpp @@ -0,0 +1,172 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/table/source/pk_count_reader.h" + +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/types/data_field.h" +#include "paimon/core/operation/internal_read_context.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/defs.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/read_context.h" +#include "paimon/scan_context.h" +#include "paimon/table/source/plan.h" +#include "paimon/table/source/split.h" +#include "paimon/table/source/table_scan.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +namespace { + +class DummySplit final : public Split {}; + +} // namespace + +class PKCountReaderTest : public testing::Test { + protected: + Result>> CreateSplits( + const std::string& table_path, const std::optional& snapshot_id) { + ScanContextBuilder scan_context_builder(table_path); + if (snapshot_id.has_value()) { + scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, + std::to_string(snapshot_id.value())); + } + + PAIMON_ASSIGN_OR_RAISE(auto scan_context, scan_context_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(auto table_scan, TableScan::Create(std::move(scan_context))); + PAIMON_ASSIGN_OR_RAISE(auto plan, table_scan->CreatePlan()); + if (snapshot_id.has_value()) { + if (!plan->SnapshotId().has_value() || plan->SnapshotId().value() != snapshot_id) { + return Status::Invalid("snapshot id mismatch"); + } + } + return plan->Splits(); + } + + Result> CreateInternalContext( + const std::string& table_path) { + ReadContextBuilder read_context_builder(table_path); + PAIMON_ASSIGN_OR_RAISE(auto read_context, read_context_builder.Finish()); + + SchemaManager schema_manager(std::make_shared(), table_path); + PAIMON_ASSIGN_OR_RAISE(auto table_schema, schema_manager.ReadSchema(0)); + PAIMON_ASSIGN_OR_RAISE(auto internal_context, + InternalReadContext::Create(std::move(read_context), table_schema, + table_schema->Options())); + return std::shared_ptr(std::move(internal_context)); + } + + Result> CreatePathFactory( + const std::shared_ptr& internal_context) { + const auto& core_options = internal_context->GetCoreOptions(); + auto arrow_schema = + DataField::ConvertDataFieldsToArrowSchema(internal_context->GetTableSchema()->Fields()); + + PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, + core_options.CreateExternalPaths()); + PAIMON_ASSIGN_OR_RAISE(std::optional global_index_external_path, + core_options.CreateGlobalIndexExternalPath()); + + PAIMON_ASSIGN_OR_RAISE( + auto path_factory, + FileStorePathFactory::Create( + internal_context->GetPath(), arrow_schema, + internal_context->GetTableSchema()->PartitionKeys(), + core_options.GetPartitionDefaultName(), core_options.GetFileFormat()->Identifier(), + core_options.DataFilePrefix(), core_options.LegacyPartitionNameEnabled(), + external_paths, global_index_external_path, core_options.IndexFileInDataFileDir(), + pool_)); + + return std::shared_ptr(std::move(path_factory)); + } + + protected: + std::shared_ptr pool_ = GetDefaultPool(); +}; + +TEST_F(PKCountReaderTest, TestCountRowsWithMORSnapshot5) { + std::string table_path = + GetDataDir() + "/orc/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; + + ASSERT_OK_AND_ASSIGN(auto splits, CreateSplits(table_path, /*snapshot_id=*/5)); + ASSERT_OK_AND_ASSIGN(auto internal_context, CreateInternalContext(table_path)); + ASSERT_OK_AND_ASSIGN(auto path_factory, CreatePathFactory(internal_context)); + + ASSERT_OK_AND_ASSIGN(auto pk_count_reader, + PKCountReader::Create(splits, path_factory, internal_context, pool_, + internal_context->GetExecutor())); + ASSERT_OK_AND_ASSIGN(int64_t count, pk_count_reader->CountRows()); + + ASSERT_EQ(count, 11); +} + +TEST_F(PKCountReaderTest, TestCountRowsWithDVSnapshot6) { + std::string table_path = + GetDataDir() + "/orc/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/"; + + ASSERT_OK_AND_ASSIGN(auto splits, CreateSplits(table_path, /*snapshot_id=*/6)); + ASSERT_OK_AND_ASSIGN(auto internal_context, CreateInternalContext(table_path)); + ASSERT_OK_AND_ASSIGN(auto path_factory, CreatePathFactory(internal_context)); + + ASSERT_OK_AND_ASSIGN(auto pk_count_reader, + PKCountReader::Create(splits, path_factory, internal_context, pool_, + internal_context->GetExecutor())); + ASSERT_OK_AND_ASSIGN(int64_t count, pk_count_reader->CountRows()); + + ASSERT_EQ(count, 8); +} + +TEST_F(PKCountReaderTest, TestCountRowsWithEmptySplits) { + std::string table_path = + GetDataDir() + "/orc/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; + + ASSERT_OK_AND_ASSIGN(auto internal_context, CreateInternalContext(table_path)); + ASSERT_OK_AND_ASSIGN(auto path_factory, CreatePathFactory(internal_context)); + + std::vector> empty_splits; + ASSERT_OK_AND_ASSIGN(auto pk_count_reader, + PKCountReader::Create(empty_splits, path_factory, internal_context, pool_, + internal_context->GetExecutor())); + ASSERT_OK_AND_ASSIGN(int64_t count, pk_count_reader->CountRows()); + + ASSERT_EQ(count, 0); +} + +TEST_F(PKCountReaderTest, TestCountRowsWithInvalidSplit) { + std::string table_path = + GetDataDir() + "/orc/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; + + ASSERT_OK_AND_ASSIGN(auto internal_context, CreateInternalContext(table_path)); + ASSERT_OK_AND_ASSIGN(auto path_factory, CreatePathFactory(internal_context)); + + std::vector> splits = {std::make_shared()}; + ASSERT_OK_AND_ASSIGN(auto pk_count_reader, + PKCountReader::Create(splits, path_factory, internal_context, pool_, + internal_context->GetExecutor())); + + ASSERT_NOK_WITH_MSG(pk_count_reader->CountRows(), "split cannot be cast to DataSplitImpl"); +} + +} // namespace paimon::test diff --git a/src/paimon/core/table/source/table_read.cpp b/src/paimon/core/table/source/table_read.cpp index 2fbadf88e..89f340b6d 100644 --- a/src/paimon/core/table/source/table_read.cpp +++ b/src/paimon/core/table/source/table_read.cpp @@ -178,4 +178,10 @@ Result> TableRead::CreateReader( return std::make_unique(std::move(batch_readers), pool_); } +Result> TableRead::CreateCountReader( + const std::vector>& splits) { + (void)splits; + return Status::NotImplemented("CreateCountReader is not implemented for this table type"); +} + } // namespace paimon diff --git a/test/inte/scan_and_read_inte_test.cpp b/test/inte/scan_and_read_inte_test.cpp index e28ee4dcd..cc19a6388 100644 --- a/test/inte/scan_and_read_inte_test.cpp +++ b/test/inte/scan_and_read_inte_test.cpp @@ -37,6 +37,7 @@ #include "paimon/core/io/data_file_meta.h" #include "paimon/core/table/source/data_split_impl.h" #include "paimon/core/table/source/deletion_file.h" +#include "paimon/core/table/source/key_value_table_read.h" #include "paimon/defs.h" #include "paimon/fs/file_system.h" #include "paimon/fs/local/local_file_system.h" @@ -333,6 +334,10 @@ TEST_P(ScanAndReadInteTest, TestWithAppendSnapshot1) { .ValueOrDie()); ASSERT_TRUE(expected); ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); + + ASSERT_OK_AND_ASSIGN(auto count_reader, table_read->CreateCountReader(splits)); + ASSERT_OK_AND_ASSIGN(int64_t count, count_reader->CountRows()); + ASSERT_EQ(count, read_result->length()); } TEST_P(ScanAndReadInteTest, TestWithAppendSnapshot3) { @@ -418,6 +423,10 @@ TEST_P(ScanAndReadInteTest, TestWithAppendSnapshot5) { .ValueOrDie()); ASSERT_TRUE(expected); ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); + + ASSERT_OK_AND_ASSIGN(auto count_reader, table_read->CreateCountReader(splits)); + ASSERT_OK_AND_ASSIGN(int64_t count, count_reader->CountRows()); + ASSERT_EQ(count, read_result->length()); } TEST_P(ScanAndReadInteTest, TestWithAppendSnapshotWithStreamWithDefaultMode) { @@ -596,6 +605,12 @@ TEST_F(ScanAndReadInteTest, TestWithPKWithDvBatchScanSnapshot6) { .ValueOrDie()); ASSERT_TRUE(expected); ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); + + // CountRows should match the number of visible rows returned by CreateReader. + ASSERT_OK_AND_ASSIGN(auto count_reader, + table_read->CreateCountReader(result_plan->Splits())); + ASSERT_OK_AND_ASSIGN(int64_t count, count_reader->CountRows()); + ASSERT_EQ(count, read_result->length()); }; for (auto [file_format, enable_prefetch] : GetTestValuesForScanAndReadInteTest()) { check_result(file_format); @@ -1066,7 +1081,8 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanLatestSnapshot) { ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 5); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); + auto splits = result_plan->Splits(); + ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(splits)); ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); // check result @@ -1087,6 +1103,10 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanLatestSnapshot) { .ValueOrDie()); ASSERT_TRUE(expected); ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); + + ASSERT_OK_AND_ASSIGN(auto count_reader, table_read->CreateCountReader(splits)); + ASSERT_OK_AND_ASSIGN(int64_t count, count_reader->CountRows()); + ASSERT_EQ(count, read_result->length()); } TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot2) { @@ -1108,7 +1128,8 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot2) { ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); ASSERT_EQ(result_plan->SnapshotId().value(), 2); - ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); + auto splits = result_plan->Splits(); + ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(splits)); ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); // check result @@ -1126,6 +1147,10 @@ TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot2) { .ValueOrDie()); ASSERT_TRUE(expected); ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); + + ASSERT_OK_AND_ASSIGN(auto count_reader, table_read->CreateCountReader(splits)); + ASSERT_OK_AND_ASSIGN(int64_t count, count_reader->CountRows()); + ASSERT_EQ(count, read_result->length()); } TEST_P(ScanAndReadInteTest, TestWithPKWithMorBatchScanSnapshot5WithPartitionAndBucketFilter) { @@ -1791,7 +1816,9 @@ TEST_P(ScanAndReadInteTest, TestWithPKWith09VersionDvBatchScanLatestSnapshot) { [0, "Emily", 10, 0, 13.1], [0, "Alice", 10, 1, 21.1], [0, "Two roads diverged in a wood, and I took the one less traveled by, And that has made all the difference.", 10, 1, 11.0], -[0, "Whether I shall turn out to be the hero of my own life.", 10, 1, 19.1] +[0, "Whether I shall turn out to be the hero of my own life.", 10, 1, 19.1], +[0, "Lucy", 20, 1, 14.1], +[0, "Paul", 20, 1, 18.1] ])") .ValueOrDie()); ASSERT_TRUE(expected); @@ -2775,4 +2802,102 @@ TEST_P(ScanAndReadInteTest, TestWithPKBucketSelectByPredicate) { ASSERT_TRUE(expected->Equals(read_result)) << read_result->ToString(); } +TEST_P(ScanAndReadInteTest, TestCountRowsEmptySplits) { + auto [file_format, enable_prefetch] = GetParam(); + std::string table_path = paimon::test::GetDataDir() + file_format + + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; + + ReadContextBuilder read_context_builder(table_path); + AddReadOptionsForPrefetch(&read_context_builder); + ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + + // CreateCountReader with empty splits should return 0 rows. + std::vector> empty_splits; + ASSERT_OK_AND_ASSIGN(auto count_reader, table_read->CreateCountReader(empty_splits)); + ASSERT_OK_AND_ASSIGN(int64_t count, count_reader->CountRows()); + ASSERT_EQ(count, 0); +} + +TEST_P(ScanAndReadInteTest, TestCountRowsConsistencyWithCreateReader) { + auto [file_format, enable_prefetch] = GetParam(); + std::string table_path = paimon::test::GetDataDir() + file_format + + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; + + // Scan latest snapshot + ScanContextBuilder scan_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + + // Method 1: CreateCountReader + iterate batches + ReadContextBuilder count_context_builder(table_path); + AddReadOptionsForPrefetch(&count_context_builder); + ASSERT_OK_AND_ASSIGN(auto count_read_context, count_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto count_table_read, TableRead::Create(std::move(count_read_context))); + ASSERT_OK_AND_ASSIGN(auto count_reader, + count_table_read->CreateCountReader(result_plan->Splits())); + ASSERT_OK_AND_ASSIGN(int64_t count_result, count_reader->CountRows()); + + // Method 2: CreateReader + iterate batches + ReadContextBuilder read_context_builder(table_path); + AddReadOptionsForPrefetch(&read_context_builder); + ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + ASSERT_OK_AND_ASSIGN(auto batch_reader, table_read->CreateReader(result_plan->Splits())); + ASSERT_OK_AND_ASSIGN(auto read_result, ReadResultCollector::CollectResult(batch_reader.get())); + int64_t iterate_count = read_result ? read_result->length() : 0; + + // Both methods should return the same count + ASSERT_EQ(count_result, iterate_count); +} + +TEST_P(ScanAndReadInteTest, TestCreateCountReaderWithPredicateNotSupported) { + auto [file_format, enable_prefetch] = GetParam(); + std::string table_path = paimon::test::GetDataDir() + file_format + + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; + + // Create splits from latest snapshot. + ScanContextBuilder scan_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + + // Set predicate in read context. CountReader currently does not support predicate pushdown. + auto predicate = PredicateBuilder::Equal(/*field_index=*/2, /*field_name=*/"f2", FieldType::INT, + Literal(static_cast(0))); + ReadContextBuilder read_context_builder(table_path); + AddReadOptionsForPrefetch(&read_context_builder); + read_context_builder.SetPredicate(predicate); + ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + + ASSERT_NOK_WITH_MSG(table_read->CreateCountReader(result_plan->Splits()), + "predicate pushdown is not supported"); +} + +TEST_P(ScanAndReadInteTest, TestCreateCountReaderWithForceKeepDeleteNotSupported) { + auto [file_format, enable_prefetch] = GetParam(); + std::string table_path = paimon::test::GetDataDir() + file_format + + "/pk_table_scan_and_read_mor.db/pk_table_scan_and_read_mor/"; + + // Create splits from latest snapshot. + ScanContextBuilder scan_context_builder(table_path); + ASSERT_OK_AND_ASSIGN(auto scan_context, scan_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + + ReadContextBuilder read_context_builder(table_path); + AddReadOptionsForPrefetch(&read_context_builder); + ASSERT_OK_AND_ASSIGN(auto read_context, read_context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_read, TableRead::Create(std::move(read_context))); + + auto* key_value_table_read = dynamic_cast(table_read.get()); + ASSERT_TRUE(key_value_table_read != nullptr); + key_value_table_read->ForceKeepDelete(true); + + ASSERT_NOK_WITH_MSG(table_read->CreateCountReader(result_plan->Splits()), + "force_keep_delete is not supported"); +} + } // namespace paimon::test