Skip to content
Merged
33 changes: 33 additions & 0 deletions include/paimon/reader/count_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2026-present Alibaba Inc.
Comment thread
lucasfang marked this conversation as resolved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "paimon/result.h"
#include "paimon/visibility.h"

namespace paimon {

/// Reader abstraction for count queries.
class PAIMON_EXPORT CountReader {
public:
virtual ~CountReader() = default;

/// Count rows for splits bound by the corresponding CreateCountReader call.
virtual Result<int64_t> CountRows() = 0;
};

} // namespace paimon
7 changes: 7 additions & 0 deletions include/paimon/table/source/table_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "paimon/memory/memory_pool.h"
#include "paimon/read_context.h"
#include "paimon/reader/batch_reader.h"
#include "paimon/reader/count_reader.h"
#include "paimon/result.h"
#include "paimon/table/source/split.h"
#include "paimon/visibility.h"
Expand Down Expand Up @@ -63,6 +64,12 @@ class PAIMON_EXPORT TableRead {
virtual Result<std::unique_ptr<BatchReader>> CreateReader(
const std::shared_ptr<Split>& split) = 0;

/// Creates a `CountReader` for count queries on the specified splits.
///
/// Implementations may override this to provide a more efficient count path.
virtual Result<std::unique_ptr<CountReader>> CreateCountReader(
const std::vector<std::shared_ptr<Split>>& splits);
Comment thread
lxy-9602 marked this conversation as resolved.

protected:
explicit TableRead(const std::shared_ptr<MemoryPool>& memory_pool);

Expand Down
5 changes: 5 additions & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ set(PAIMON_CORE_SRCS
core/mergetree/lookup_file.cpp
core/mergetree/lookup_levels.cpp
core/mergetree/lookup/remote_lookup_file_manager.cpp
core/mergetree/row_count_accumulator.cpp
core/migrate/file_meta_utils.cpp
core/operation/data_evolution_file_store_scan.cpp
core/operation/data_evolution_split_read.cpp
Expand Down Expand Up @@ -305,13 +306,15 @@ set(PAIMON_CORE_SRCS
core/table/sink/commit_message.cpp
core/table/sink/commit_message_impl.cpp
core/table/sink/commit_message_serializer.cpp
core/table/source/append_count_reader.cpp
core/table/source/append_only_table_read.cpp
core/table/source/split.cpp
core/table/source/data_split_impl.cpp
core/table/source/data_table_batch_scan.cpp
core/table/source/data_table_stream_scan.cpp
core/table/source/fallback_table_read.cpp
core/table/source/key_value_table_read.cpp
core/table/source/pk_count_reader.cpp
core/table/source/merge_tree_split_generator.cpp
core/table/source/data_evolution_split_generator.cpp
core/table/source/plan_impl.cpp
Expand Down Expand Up @@ -703,6 +706,8 @@ if(PAIMON_BUILD_TESTS)
core/table/sink/commit_message_impl_test.cpp
core/table/source/fallback_data_split_test.cpp
core/table/source/table_read_test.cpp
core/table/source/append_count_reader_test.cpp
core/table/source/pk_count_reader_test.cpp
core/table/source/data_split_test.cpp
core/table/source/deletion_file_test.cpp
core/table/source/split_generator_test.cpp
Expand Down
17 changes: 17 additions & 0 deletions src/paimon/core/deletionvectors/deletion_vector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "paimon/core/deletionvectors/deletion_vector.h"

#include <cassert>
#include <cstddef>
#include <string>

Expand Down Expand Up @@ -59,6 +60,22 @@ DeletionVector::Factory DeletionVector::CreateFactory(
};
}

std::unordered_map<std::string, DeletionFile> DeletionVector::CreateDeletionFileMap(
const std::vector<std::shared_ptr<DataFileMeta>>& data_files,
const std::vector<std::optional<DeletionFile>>& deletion_files) {
std::unordered_map<std::string, DeletionFile> deletion_file_map;
if (deletion_files.empty()) {
return deletion_file_map;
}
assert(deletion_files.size() == data_files.size());
for (size_t i = 0; i < deletion_files.size(); i++) {
if (deletion_files[i] != std::nullopt) {
deletion_file_map.emplace(data_files[i]->file_name, deletion_files[i].value());
}
}
return deletion_file_map;
}

Result<PAIMON_UNIQUE_PTR<DeletionVector>> DeletionVector::DeserializeFromBytes(const Bytes* bytes,
MemoryPool* pool) {
return BitmapDeletionVector::Deserialize(bytes->data(), bytes->size(), pool);
Expand Down
8 changes: 8 additions & 0 deletions src/paimon/core/deletionvectors/deletion_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ class DeletionVector {

static Factory CreateFactory(const std::shared_ptr<BucketedDvMaintainer>& dv_maintainer);

/// Builds a map from data file name to its deletion file.
///
/// Entries whose deletion file is absent are skipped. Returns an empty map when
/// `deletion_files` is empty.
static std::unordered_map<std::string, DeletionFile> CreateDeletionFileMap(
const std::vector<std::shared_ptr<DataFileMeta>>& data_files,
const std::vector<std::optional<DeletionFile>>& deletion_files);

virtual ~DeletionVector() = default;

/// Marks the row at the specified position as deleted.
Expand Down
34 changes: 34 additions & 0 deletions src/paimon/core/deletionvectors/deletion_vector_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <memory>
#include <optional>
#include <set>
#include <vector>

#include "gtest/gtest.h"
#include "paimon/core/deletionvectors/bitmap64_deletion_vector.h"
#include "paimon/core/deletionvectors/bitmap_deletion_vector.h"
#include "paimon/core/io/data_file_meta.h"
#include "paimon/core/table/source/deletion_file.h"
#include "paimon/io/byte_array_input_stream.h"
#include "paimon/io/byte_order.h"
#include "paimon/io/data_input_stream.h"
Expand All @@ -41,6 +45,16 @@ void AppendInt32BigEndian(std::vector<uint8_t>* bytes, int32_t value) {
bytes->push_back(static_cast<uint8_t>(value & 0xFF));
}

std::shared_ptr<DataFileMeta> CreateDataFileMeta(const std::string& file_name) {
return std::make_shared<DataFileMeta>(
file_name, /*file_size=*/100, /*row_count=*/10, DataFileMeta::EmptyMinKey(),
DataFileMeta::EmptyMaxKey(), SimpleStats::EmptyStats(), SimpleStats::EmptyStats(),
/*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0,
DataFileMeta::DUMMY_LEVEL, std::vector<std::optional<std::string>>{}, Timestamp(0, 0),
std::nullopt, nullptr, FileSource::Append(), std::nullopt, std::nullopt, std::nullopt,
std::nullopt);
}

} // namespace

TEST(DeletionVectorTest, TestSimple) {
Expand Down Expand Up @@ -161,4 +175,24 @@ TEST(DeletionVectorTest, ReadFromDataInputStreamInvalidMagicNumber) {
"Invalid magic number");
}

TEST(DeletionVectorTest, CreateDeletionFileMap) {
std::vector<std::shared_ptr<DataFileMeta>> data_files = {CreateDataFileMeta("file-0.orc"),
CreateDataFileMeta("file-1.orc"),
CreateDataFileMeta("file-2.orc")};

auto empty_map = DeletionVector::CreateDeletionFileMap(data_files, {});
ASSERT_TRUE(empty_map.empty());

DeletionFile deletion_file_0("dv-0", /*offset=*/10, /*length=*/20, /*cardinality=*/3);
DeletionFile deletion_file_2("dv-2", /*offset=*/30, /*length=*/40, std::nullopt);
std::vector<std::optional<DeletionFile>> deletion_files = {deletion_file_0, std::nullopt,
deletion_file_2};

auto deletion_file_map = DeletionVector::CreateDeletionFileMap(data_files, deletion_files);
ASSERT_EQ(deletion_file_map.size(), 2);
ASSERT_EQ(deletion_file_map.at("file-0.orc"), deletion_file_0);
ASSERT_EQ(deletion_file_map.count("file-1.orc"), 0);
ASSERT_EQ(deletion_file_map.at("file-2.orc"), deletion_file_2);
}

} // namespace paimon::test
66 changes: 66 additions & 0 deletions src/paimon/core/mergetree/row_count_accumulator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "paimon/core/mergetree/row_count_accumulator.h"

#include <utility>

#include "paimon/core/key_value.h"

namespace paimon {

RowCountAccumulator::RowCountAccumulator(std::unique_ptr<SortMergeReader>&& merged_reader)
: merged_reader_(std::move(merged_reader)) {}

Result<int64_t> RowCountAccumulator::CountAll() {
int64_t count = 0;

while (true) {
// Get next batch of merged KV iterators
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<SortMergeReader::Iterator> iter,
merged_reader_->NextBatch());
if (iter == nullptr) {
// No more data
break;
}

// Iterate through all KV objects in this batch
while (true) {
PAIMON_ASSIGN_OR_RAISE(bool has_next, iter->HasNext());
if (!has_next) {
break;
}

iter->Next();

// At this point:
// - kv has passed through SortMergeReader (deduplicated, merged)
// - kv has passed through DropDeleteReader (kind is guaranteed IsAdd())
// - kv represents a final, valid, non-deleted row
count++;
}
}

return count;
}

void RowCountAccumulator::Close() {
if (merged_reader_) {
merged_reader_->Close();
}
}

} // namespace paimon
47 changes: 47 additions & 0 deletions src/paimon/core/mergetree/row_count_accumulator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cstdint>
#include <memory>

#include "paimon/core/mergetree/compact/sort_merge_reader.h"
#include "paimon/result.h"

namespace paimon {

/// Counts rows from a merged KeyValue stream after delete rows are dropped.
class RowCountAccumulator {
public:
/// @param merged_reader The merged reader. Must be wrapped with DropDeleteReader
/// so that only valid (non-deleted) KeyValue objects are output.
explicit RowCountAccumulator(std::unique_ptr<SortMergeReader>&& merged_reader);

~RowCountAccumulator() = default;

/// Count all valid rows from the merge reader.
/// Iterates through all merged+deduplicated+non-deleted KeyValue objects.
Result<int64_t> CountAll();

/// Close underlying readers and release resources.
void Close();

private:
std::unique_ptr<SortMergeReader> merged_reader_;
};

} // namespace paimon
23 changes: 0 additions & 23 deletions src/paimon/core/operation/abstract_split_read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,29 +102,6 @@ bool AbstractSplitRead::NeedCompleteRowTrackingFields(
}
return false;
}

std::unordered_map<std::string, DeletionFile> AbstractSplitRead::CreateDeletionFileMap(
const DataSplitImpl& data_split) {
return CreateDeletionFileMap(data_split.DataFiles(), data_split.DeletionFiles());
}

std::unordered_map<std::string, DeletionFile> AbstractSplitRead::CreateDeletionFileMap(
const std::vector<std::shared_ptr<DataFileMeta>>& data_files,
const std::vector<std::optional<DeletionFile>>& deletion_files) {
std::unordered_map<std::string, DeletionFile> deletion_file_map;
if (deletion_files.empty()) {
return deletion_file_map;
}
assert(deletion_files.size() == data_files.size());
size_t file_count = deletion_files.size();
for (size_t i = 0; i < file_count; i++) {
if (deletion_files[i] != std::nullopt) {
deletion_file_map.emplace(data_files[i]->file_name, deletion_files[i].value());
}
}
return deletion_file_map;
}

Result<std::unique_ptr<BatchReader>> AbstractSplitRead::ApplyPredicateFilterIfNeeded(
std::unique_ptr<BatchReader>&& reader, const std::shared_ptr<Predicate>& predicate) const {
if (!context_->EnablePredicateFilter() || predicate == nullptr) {
Expand Down
10 changes: 0 additions & 10 deletions src/paimon/core/operation/abstract_split_read.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <vector>

#include "arrow/type_fwd.h"
Expand All @@ -30,7 +29,6 @@
#include "paimon/core/operation/split_read.h"
#include "paimon/core/schema/schema_manager.h"
#include "paimon/core/table/source/data_split_impl.h"
#include "paimon/core/table/source/deletion_file.h"
#include "paimon/core/utils/file_store_path_factory.h"
#include "paimon/format/reader_builder.h"
#include "paimon/reader/batch_reader.h"
Expand Down Expand Up @@ -73,14 +71,6 @@ class AbstractSplitRead : public SplitRead {
std::unique_ptr<SchemaManager>&& schema_manager,
const std::shared_ptr<MemoryPool>& memory_pool,
const std::shared_ptr<Executor>& executor);

static std::unordered_map<std::string, DeletionFile> CreateDeletionFileMap(
const DataSplitImpl& data_split);

static std::unordered_map<std::string, DeletionFile> CreateDeletionFileMap(
const std::vector<std::shared_ptr<DataFileMeta>>& data_files,
const std::vector<std::optional<DeletionFile>>& deletion_files);

Result<std::unique_ptr<BatchReader>> ApplyPredicateFilterIfNeeded(
std::unique_ptr<BatchReader>&& reader, const std::shared_ptr<Predicate>& predicate) const;

Expand Down
9 changes: 9 additions & 0 deletions src/paimon/core/operation/internal_read_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,13 @@ InternalReadContext::InternalReadContext(const std::shared_ptr<ReadContext>& rea
read_schema_(read_schema),
options_(options) {}

Result<std::shared_ptr<InternalReadContext>> InternalReadContext::CreateWithSchema(
const std::shared_ptr<InternalReadContext>& original,
const std::shared_ptr<arrow::Schema>& new_read_schema) {
// Create a new InternalReadContext sharing all properties except read_schema.
// The new read_schema is the minimal column set for COUNT(*).
return std::shared_ptr<InternalReadContext>(new InternalReadContext(
original->read_context_, original->table_schema_, new_read_schema, original->options_));
}

} // namespace paimon
8 changes: 8 additions & 0 deletions src/paimon/core/operation/internal_read_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ class InternalReadContext {
return read_context_->GetCacheConfig();
}

/// Create a new InternalReadContext with a different read schema.
/// Useful for creating a context with a minimal column set for specialized reads.
/// All other settings (predicate, options, table_schema, etc.) are inherited
/// from the original context.
static Result<std::shared_ptr<InternalReadContext>> CreateWithSchema(
const std::shared_ptr<InternalReadContext>& original,
const std::shared_ptr<arrow::Schema>& new_read_schema);

private:
InternalReadContext(const std::shared_ptr<ReadContext>& read_context,
const std::shared_ptr<TableSchema>& table_schema,
Expand Down
Loading
Loading