Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions include/paimon/read_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "paimon/predicate/predicate.h"
#include "paimon/result.h"
#include "paimon/type_fwd.h"
#include "paimon/utils/special_field_ids.h"
#include "paimon/visibility.h"

namespace paimon {
Expand All @@ -43,6 +44,7 @@ class PAIMON_EXPORT ReadContext {
public:
ReadContext(const std::string& path, const std::string& branch,
const std::vector<std::string>& read_schema,
const std::vector<int32_t>& read_field_ids,
const std::shared_ptr<Predicate>& predicate, bool enable_predicate_filter,
bool enable_prefetch, uint32_t prefetch_batch_count,
uint32_t prefetch_max_parallel_num, bool enable_multi_thread_row_to_batch,
Expand Down Expand Up @@ -74,6 +76,10 @@ class PAIMON_EXPORT ReadContext {
return read_schema_;
}

const std::vector<int32_t>& GetReadFieldIds() const {
return read_field_ids_;
}

const std::shared_ptr<Predicate>& GetPredicate() const {
return predicate_;
}
Expand Down Expand Up @@ -113,6 +119,7 @@ class PAIMON_EXPORT ReadContext {
std::string path_;
std::string branch_;
std::vector<std::string> read_schema_;
std::vector<int32_t> read_field_ids_;
std::shared_ptr<Predicate> predicate_;
bool enable_predicate_filter_;
bool enable_prefetch_;
Expand Down Expand Up @@ -151,6 +158,19 @@ class PAIMON_EXPORT ReadContextBuilder {
/// @note Currently supports top-level field selection. Future versions may support
/// nested field selection using ArrowSchema for more granular projection
ReadContextBuilder& SetReadSchema(const std::vector<std::string>& read_field_names);
/// Set the schema fields to read from the table.
///
/// If not set, all fields from the table schema will be read. This is useful for
/// projection pushdown to reduce I/O and improve performance by reading only
/// the required columns.
///
/// @param read_field_ids Vector of field ids to read from the table.
/// @return Reference to this builder for method chaining.
/// @note Currently supports top-level field selection. Future versions may support
/// nested field selection using ArrowSchema for more granular projection,
/// If SetReadFieldIds() call and SetReadSchema() are natually are mutually
/// exclusive. Calling both will ignore the read schema set by SetReadSchema().
ReadContextBuilder& SetReadFieldIds(const std::vector<int32_t>& read_field_ids);

/// Set a configuration options map to set some option entries which are not defined in the
/// table schema or whose values you want to overwrite.
Expand Down
42 changes: 42 additions & 0 deletions include/paimon/utils/special_field_ids.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2026-present Alibaba Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <cstdint>
#include <limits>

namespace paimon {

/// A utility class for accessing special field IDs used in metadata.
class SpecialFieldIds {
protected:
/// System defined constant for field id boundary. Value: INT32_MAX - 10000
static const int32_t CPP_FIELD_ID_END;

public:
/// Special field ID reserved for sequence number. Value: INT32_MAX - 1
static const int32_t SEQUENCE_NUMBER;
/// Special field ID reserved for value kind. Value: INT32_MAX - 2
static const int32_t VALUE_KIND;
/// Special field ID reserved for row ID. Value: INT32_MAX - 5
static const int32_t ROW_ID;

/// Special field ID reserved for index score. Value: CPP_FIELD_ID_END - 1
static const int32_t INDEX_SCORE;
};

} // namespace paimon
3 changes: 2 additions & 1 deletion src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ set(PAIMON_CORE_SRCS
core/utils/manifest_meta_reader.cpp
core/utils/partition_path_utils.cpp
core/utils/primary_key_table_utils.cpp
core/utils/snapshot_manager.cpp)
core/utils/snapshot_manager.cpp
core/utils/special_field_ids.cpp)

add_paimon_lib(paimon
SOURCES
Expand Down
17 changes: 8 additions & 9 deletions src/paimon/common/table/special_fields.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,35 @@

#include "arrow/type_fwd.h"
#include "paimon/common/types/data_field.h"
#include "paimon/utils/special_field_ids.h"

namespace paimon {

struct SpecialFields {
static constexpr char KEY_FIELD_PREFIX[] = "_KEY_";
static constexpr int32_t KEY_VALUE_SPECIAL_FIELD_COUNT = 2;
static constexpr int32_t CPP_FIELD_ID_END = std::numeric_limits<int32_t>::max() - 10000;

static const DataField& SequenceNumber() {
static const DataField data_field =
DataField(std::numeric_limits<int32_t>::max() - 1,
arrow::field("_SEQUENCE_NUMBER", arrow::int64()));
static const DataField data_field = DataField(
SpecialFieldIds::SEQUENCE_NUMBER, arrow::field("_SEQUENCE_NUMBER", arrow::int64()));
return data_field;
}

static const DataField& ValueKind() {
static const DataField data_field = DataField(std::numeric_limits<int32_t>::max() - 2,
arrow::field("_VALUE_KIND", arrow::int8()));
static const DataField data_field =
DataField(SpecialFieldIds::VALUE_KIND, arrow::field("_VALUE_KIND", arrow::int8()));
return data_field;
}

static const DataField& RowId() {
static const DataField data_field = DataField(std::numeric_limits<int32_t>::max() - 5,
arrow::field("_ROW_ID", arrow::int64()));
static const DataField data_field =
DataField(SpecialFieldIds::ROW_ID, arrow::field("_ROW_ID", arrow::int64()));
return data_field;
}

static const DataField& IndexScore() {
static const DataField data_field =
DataField(CPP_FIELD_ID_END - 1, arrow::field("_INDEX_SCORE", arrow::float32()));
DataField(SpecialFieldIds::INDEX_SCORE, arrow::field("_INDEX_SCORE", arrow::float32()));
return data_field;
}

Expand Down
58 changes: 40 additions & 18 deletions src/paimon/core/operation/internal_read_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,26 +38,48 @@ Result<std::unique_ptr<InternalReadContext>> InternalReadContext::Create(
context->GetSpecificFileSystem()));
// prepare read schema
std::vector<DataField> read_data_fields;
read_data_fields.reserve(context->GetReadSchema().size());
for (const auto& name : context->GetReadSchema()) {
// if enable row tracking or data evolution, check special fields
if (core_options.RowTrackingEnabled() && name == SpecialFields::RowId().Name()) {
read_data_fields.push_back(SpecialFields::RowId());
continue;
if (!context->GetReadFieldIds().empty()) {
read_data_fields.reserve(context->GetReadFieldIds().size());
for (const auto& field_id : context->GetReadFieldIds()) {
// if enable row tracking or data evolution, check special fields
if (core_options.RowTrackingEnabled() && field_id == SpecialFields::RowId().Id()) {
read_data_fields.push_back(SpecialFields::RowId());
continue;
}
if (core_options.RowTrackingEnabled() &&
field_id == SpecialFields::SequenceNumber().Id()) {
read_data_fields.push_back(SpecialFields::SequenceNumber());
continue;
}
if (core_options.DataEvolutionEnabled() &&
field_id == SpecialFields::IndexScore().Id()) {
read_data_fields.push_back(SpecialFields::IndexScore());
continue;
}
PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema->GetField(field_id));
read_data_fields.push_back(field);
}
if (core_options.RowTrackingEnabled() && name == SpecialFields::SequenceNumber().Name()) {
read_data_fields.push_back(SpecialFields::SequenceNumber());
continue;
} else if (!context->GetReadSchema().empty()) {
read_data_fields.reserve(context->GetReadSchema().size());
for (const auto& name : context->GetReadSchema()) {
// if enable row tracking or data evolution, check special fields
if (core_options.RowTrackingEnabled() && name == SpecialFields::RowId().Name()) {
read_data_fields.push_back(SpecialFields::RowId());
continue;
}
if (core_options.RowTrackingEnabled() &&
name == SpecialFields::SequenceNumber().Name()) {
read_data_fields.push_back(SpecialFields::SequenceNumber());
continue;
}
if (core_options.DataEvolutionEnabled() && name == SpecialFields::IndexScore().Name()) {
read_data_fields.push_back(SpecialFields::IndexScore());
continue;
}
PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema->GetField(name));
read_data_fields.push_back(field);
}
if (core_options.DataEvolutionEnabled() && name == SpecialFields::IndexScore().Name()) {
read_data_fields.push_back(SpecialFields::IndexScore());
continue;
}
PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema->GetField(name));
read_data_fields.push_back(field);
}

if (read_data_fields.empty()) {
} else {
// if field names not set, read all fields
read_data_fields = table_schema->Fields();
}
Expand Down
63 changes: 63 additions & 0 deletions src/paimon/core/operation/internal_read_context_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,41 @@ TEST(InternalReadContext, TestReadWithSpecifiedSchema) {
ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
}

TEST(InternalReadContext, TestReadWithSpecifiedFieldId) {
std::string path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09";
ReadContextBuilder context_builder(path);
context_builder.SetReadFieldIds({3, 0});
ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
SchemaManager schema_manager(std::make_shared<LocalFileSystem>(), read_context->GetPath());
ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
ASSERT_OK_AND_ASSIGN(auto internal_context,
InternalReadContext::Create(std::move(read_context), table_schema,
table_schema->Options()));
std::vector<DataField> read_fields = {DataField(3, arrow::field("f3", arrow::float64())),
DataField(0, arrow::field("f0", arrow::utf8()))};
auto expected_schema = DataField::ConvertDataFieldsToArrowSchema(read_fields);
ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
}

TEST(InternalReadContext, TestReadWithSpecifiedFieldIdAndSchema) {
std::string path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09";
ReadContextBuilder context_builder(path);
// read schema is specified, read fields in schema
// will use field ids instead of field names.
context_builder.SetReadSchema({"f0"});
context_builder.SetReadFieldIds({3, 0});
ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
SchemaManager schema_manager(std::make_shared<LocalFileSystem>(), read_context->GetPath());
ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
ASSERT_OK_AND_ASSIGN(auto internal_context,
InternalReadContext::Create(std::move(read_context), table_schema,
table_schema->Options()));
std::vector<DataField> read_fields = {DataField(3, arrow::field("f3", arrow::float64())),
DataField(0, arrow::field("f0", arrow::utf8()))};
auto expected_schema = DataField::ConvertDataFieldsToArrowSchema(read_fields);
ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
}

TEST(InternalReadContext, TestReadWithRowTrackingAndScoreFields) {
{
// test simple
Expand Down Expand Up @@ -111,4 +146,32 @@ TEST(InternalReadContext, TestReadWithRowTrackingAndScoreFields) {
}
}

TEST(InternalReadContext, TestReadWithFieldIdsAndSpecialFields) {
{
// test simple
std::string path = paimon::test::GetDataDir() + "/orc/append_09.db/append_09";
ReadContextBuilder context_builder(path);
// here we use field ids instead of field names, and specify special ids for row id,
// sequence number and index score.
context_builder.SetReadFieldIds({3, 0, SpecialFieldIds::ROW_ID,
SpecialFieldIds::SEQUENCE_NUMBER,
SpecialFieldIds::INDEX_SCORE});
ASSERT_OK_AND_ASSIGN(auto read_context, context_builder.Finish());
SchemaManager schema_manager(std::make_shared<LocalFileSystem>(), read_context->GetPath());
ASSERT_OK_AND_ASSIGN(auto table_schema, schema_manager.ReadSchema(0));
auto new_options = table_schema->Options();
new_options[Options::ROW_TRACKING_ENABLED] = "true";
new_options[Options::DATA_EVOLUTION_ENABLED] = "true";
ASSERT_OK_AND_ASSIGN(
auto internal_context,
InternalReadContext::Create(std::move(read_context), table_schema, new_options));
std::vector<DataField> read_fields = {
DataField(3, arrow::field("f3", arrow::float64())),
DataField(0, arrow::field("f0", arrow::utf8())), SpecialFields::RowId(),
SpecialFields::SequenceNumber(), SpecialFields::IndexScore()};
auto expected_schema = DataField::ConvertDataFieldsToArrowSchema(read_fields);
ASSERT_TRUE(internal_context->GetReadSchema()->Equals(expected_schema));
}
}

} // namespace paimon::test
43 changes: 25 additions & 18 deletions src/paimon/core/operation/read_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,20 @@
namespace paimon {
class Predicate;

ReadContext::ReadContext(const std::string& path, const std::string& branch,
const std::vector<std::string>& read_schema,
const std::shared_ptr<Predicate>& predicate, bool enable_predicate_filter,
bool enable_prefetch, uint32_t prefetch_batch_count,
uint32_t prefetch_max_parallel_num, bool enable_multi_thread_row_to_batch,
uint32_t row_to_batch_thread_number,
const std::optional<std::string>& table_schema,
const std::shared_ptr<MemoryPool>& memory_pool,
const std::shared_ptr<Executor>& executor,
const std::shared_ptr<FileSystem>& specific_file_system,
const std::map<std::string, std::string>& fs_scheme_to_identifier_map,
const std::map<std::string, std::string>& options)
ReadContext::ReadContext(
const std::string& path, const std::string& branch, const std::vector<std::string>& read_schema,
const std::vector<int32_t>& read_field_ids, const std::shared_ptr<Predicate>& predicate,
bool enable_predicate_filter, bool enable_prefetch, uint32_t prefetch_batch_count,
uint32_t prefetch_max_parallel_num, bool enable_multi_thread_row_to_batch,
uint32_t row_to_batch_thread_number, const std::optional<std::string>& table_schema,
const std::shared_ptr<MemoryPool>& memory_pool, const std::shared_ptr<Executor>& executor,
const std::shared_ptr<FileSystem>& specific_file_system,
const std::map<std::string, std::string>& fs_scheme_to_identifier_map,
const std::map<std::string, std::string>& options)
: path_(path),
branch_(branch),
read_schema_(read_schema),
read_field_ids_(read_field_ids),
predicate_(predicate),
enable_predicate_filter_(enable_predicate_filter),
enable_prefetch_(enable_prefetch),
Expand All @@ -64,6 +63,7 @@ class ReadContextBuilder::Impl {
void Reset() {
branch_ = BranchManager::DEFAULT_MAIN_BRANCH;
read_field_names_.clear();
read_field_ids_.clear();
fs_scheme_to_identifier_map_.clear();
options_.clear();
predicate_.reset();
Expand All @@ -83,6 +83,7 @@ class ReadContextBuilder::Impl {
std::string path_;
std::string branch_ = BranchManager::DEFAULT_MAIN_BRANCH;
std::vector<std::string> read_field_names_;
std::vector<int32_t> read_field_ids_;
std::map<std::string, std::string> fs_scheme_to_identifier_map_;
std::map<std::string, std::string> options_;
std::shared_ptr<Predicate> predicate_;
Expand Down Expand Up @@ -125,6 +126,12 @@ ReadContextBuilder& ReadContextBuilder::SetReadSchema(
return *this;
}

ReadContextBuilder& ReadContextBuilder::SetReadFieldIds(
const std::vector<int32_t>& read_field_ids) {
impl_->read_field_ids_ = read_field_ids;
return *this;
}

ReadContextBuilder& ReadContextBuilder::SetPredicate(const std::shared_ptr<Predicate>& predicate) {
impl_->predicate_ = predicate;
return *this;
Expand Down Expand Up @@ -216,12 +223,12 @@ Result<std::unique_ptr<ReadContext>> ReadContextBuilder::Finish() {
return Status::Invalid("row to batch thread number should be greater than 0");
}
auto ctx = std::make_unique<ReadContext>(
impl_->path_, impl_->branch_, impl_->read_field_names_, impl_->predicate_,
impl_->enable_predicate_filter_, impl_->enable_prefetch_, impl_->prefetch_batch_count_,
impl_->prefetch_max_parallel_num_, impl_->enable_multi_thread_row_to_batch_,
impl_->row_to_batch_thread_number_, impl_->table_schema_, impl_->memory_pool_,
impl_->executor_, impl_->specific_file_system_, impl_->fs_scheme_to_identifier_map_,
impl_->options_);
impl_->path_, impl_->branch_, impl_->read_field_names_, impl_->read_field_ids_,
impl_->predicate_, impl_->enable_predicate_filter_, impl_->enable_prefetch_,
impl_->prefetch_batch_count_, impl_->prefetch_max_parallel_num_,
impl_->enable_multi_thread_row_to_batch_, impl_->row_to_batch_thread_number_,
impl_->table_schema_, impl_->memory_pool_, impl_->executor_, impl_->specific_file_system_,
impl_->fs_scheme_to_identifier_map_, impl_->options_);
impl_->Reset();
return ctx;
}
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/core/operation/read_context_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ TEST(ReadContextTest, TestSimple) {
ASSERT_TRUE(ctx->GetMemoryPool());
ASSERT_TRUE(ctx->GetExecutor());
ASSERT_TRUE(ctx->GetReadSchema().empty());
ASSERT_TRUE(ctx->GetReadFieldIds().empty());
ASSERT_TRUE(ctx->GetOptions().empty());
ASSERT_FALSE(ctx->GetPredicate());
ASSERT_FALSE(ctx->EnablePredicateFilter());
Expand All @@ -50,6 +51,7 @@ TEST(ReadContextTest, TestSetContent) {
ReadContextBuilder builder("table_root_path");
builder.AddOption("key", "value");
builder.SetReadSchema({"f1", "f2"});
builder.SetReadFieldIds({0, 1});
auto predicate =
PredicateBuilder::IsNull(/*field_index=*/0, /*field_name=*/"f1", FieldType::INT);
builder.SetPredicate(predicate);
Expand All @@ -70,6 +72,7 @@ TEST(ReadContextTest, TestSetContent) {
ASSERT_TRUE(ctx->GetMemoryPool());
ASSERT_TRUE(ctx->GetExecutor());
ASSERT_EQ(ctx->GetReadSchema(), std::vector<std::string>({"f1", "f2"}));
ASSERT_EQ(ctx->GetReadFieldIds(), std::vector<int32_t>({0, 1}));
ASSERT_EQ(*predicate, *(ctx->GetPredicate()));
ASSERT_TRUE(ctx->EnablePredicateFilter());
ASSERT_TRUE(ctx->EnablePrefetch());
Expand Down
Loading
Loading