Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@ struct PAIMON_EXPORT Options {
/// "row-tracking.enabled" - Whether enable unique row id for append table. Default value is
/// "false".
static const char ROW_TRACKING_ENABLED[];
/// "row-tracking.partition-group-on-commit" - When row-tracking is enabled, whether to group
/// new file metas by partition before commit, so that assigned row IDs are contiguous within
/// each partition. This is useful if you want to build global indices on this table. Default
/// value is "true".
static const char ROW_TRACKING_PARTITION_GROUP_ON_COMMIT[];
/// "data-evolution.enabled" - Whether enable data evolution for row tracking table. Default
/// value is "false".
static const char DATA_EVOLUTION_ENABLED[];
Expand All @@ -363,6 +368,14 @@ struct PAIMON_EXPORT Options {
/// "blob-as-descriptor" - Read and write blob field using blob descriptor rather than blob
/// bytes. Default value is "false".
static const char BLOB_AS_DESCRIPTOR[];
/// "blob-field" - Specifies column names that should be stored as blob type. This is used
/// when you want to treat a BYTES column as a BLOB. Comma-separated field names.
/// Multiple blob fields are supported.
static const char BLOB_FIELD[];
// TODO(xinyu.lxy): support "blob-descriptor-field" - treat fields as BLOB and store as
// BlobDescriptor
// TODO(xinyu.lxy): support "blob-view-field" - treat fields as BLOB and resolve from upstream
// tables
/// "global-index.enabled" - Whether to enable global index for scan. Default value is "true".
static const char GLOBAL_INDEX_ENABLED[];
/// "global-index.thread-num" - The maximum number of concurrent scanner for global index. No
Expand Down
1 change: 1 addition & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ set(PAIMON_CORE_SRCS
core/io/key_value_meta_projection_consumer.cpp
core/io/key_value_projection_consumer.cpp
core/io/key_value_projection_reader.cpp
core/io/multiple_blob_file_writer.cpp
core/io/rolling_blob_file_writer.cpp
core/manifest/file_kind.cpp
core/manifest/file_source.cpp
Expand Down
48 changes: 36 additions & 12 deletions src/paimon/common/data/blob_defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,45 @@

#pragma once

#include <cstdint>

namespace paimon {

/// Blob file format constants shared between writer and reader.
///
/// A Blob field uses the 'large_binary' type as its underlying physical storage in Apache Arrow
/// Schema, and is marked as the Paimon Blob extension type by attaching specific
/// **KeyValueMetadata**. Only one blob field in one paimon table is allowed.
///
/// To create a Blob field:
/// @code
/// std::unordered_map<std::string, std::string> blob_metadata_map = {
/// {Blob::EXTENSION_TYPE_KEY, Blob::EXTENSION_TYPE_VALUE}
/// };
/// auto field = arrow::field("my_blob_field", arrow::large_binary(), false,
/// std::make_shared<arrow::KeyValueMetadata>(blob_metadata_map));
/// @endcode
constexpr char BLOB_EXTENSION_TYPE_KEY[] = "paimon.extension.type";
constexpr char BLOB_EXTENSION_TYPE_VALUE[] = "paimon.type.blob";
/// **KeyValueMetadata**. Multiple blob fields in one paimon table are supported.
class BlobDefs {
public:
BlobDefs() = delete;
~BlobDefs() = delete;

/// To create a Blob field:
/// @code
/// std::unordered_map<std::string, std::string> blob_metadata_map = {
/// {Blob::EXTENSION_TYPE_KEY, Blob::EXTENSION_TYPE_VALUE}
/// };
/// auto field = arrow::field("my_blob_field", arrow::large_binary(), false,
/// std::make_shared<arrow::KeyValueMetadata>(blob_metadata_map));
/// @endcode
/// Metadata key identifying a Paimon Blob extension type field.
static constexpr char kExtensionTypeKey[] = "paimon.extension.type";
/// Metadata value identifying a Paimon Blob extension type field.
static constexpr char kExtensionTypeValue[] = "paimon.type.blob";

/// A bin_length value of -1 in the index indicates a null blob entry.
static constexpr int64_t kNullBinLength = -1;
/// Blob file format version.
static constexpr int8_t kFileVersion = 1;
/// Magic number identifying the start of each blob bin.
static constexpr int32_t kMagicNumber = 1481511375;
/// Offset from the start of a bin to the actual blob content (magic number size).
static constexpr int32_t kContentStartOffset = 4;
/// Total metadata length per bin: magic(4) + bin_length(8) + crc32(4) = 16.
static constexpr int32_t kTotalMetaLength = 16;
/// Blob file header length: index_len(4) + version(1) = 5.
static constexpr uint32_t kBlobFileHeaderLength = 5;
};

} // namespace paimon
6 changes: 3 additions & 3 deletions src/paimon/common/data/blob_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ bool BlobUtils::IsBlobMetadata(const std::shared_ptr<const arrow::KeyValueMetada
if (!metadata) {
return false;
}
auto extension_name = metadata->Get(BLOB_EXTENSION_TYPE_KEY);
auto extension_name = metadata->Get(BlobDefs::kExtensionTypeKey);
if (!extension_name.ok()) {
return false;
}
return extension_name.ValueUnsafe() == BLOB_EXTENSION_TYPE_VALUE;
return extension_name.ValueUnsafe() == BlobDefs::kExtensionTypeValue;
}

bool BlobUtils::IsBlobFile(const std::string& file_name) {
Expand All @@ -110,7 +110,7 @@ bool BlobUtils::IsBlobFile(const std::string& file_name) {
std::shared_ptr<arrow::Field> BlobUtils::ToArrowField(
const std::string& field_name, bool nullable,
std::unordered_map<std::string, std::string> metadata) {
metadata[BLOB_EXTENSION_TYPE_KEY] = BLOB_EXTENSION_TYPE_VALUE;
metadata[BlobDefs::kExtensionTypeKey] = BlobDefs::kExtensionTypeValue;
return arrow::field(field_name, arrow::large_binary(), nullable,
std::make_shared<arrow::KeyValueMetadata>(metadata));
}
Expand Down
6 changes: 3 additions & 3 deletions src/paimon/common/data/blob_utils_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class BlobUtilsTest : public ::testing::Test {
private:
std::shared_ptr<arrow::KeyValueMetadata> CreateBlobMetadata() {
std::unordered_map<std::string, std::string> blob_metadata_map = {
{BLOB_EXTENSION_TYPE_KEY, BLOB_EXTENSION_TYPE_VALUE}};
{BlobDefs::kExtensionTypeKey, BlobDefs::kExtensionTypeValue}};
return std::make_shared<arrow::KeyValueMetadata>(blob_metadata_map);
}
};
Expand All @@ -39,11 +39,11 @@ TEST_F(BlobUtilsTest, IsBlobMetadata) {
EXPECT_TRUE(BlobUtils::IsBlobMetadata(correct_metadata));
EXPECT_FALSE(BlobUtils::IsBlobMetadata(nullptr));
std::unordered_map<std::string, std::string> wrong_metadata_map = {
{BLOB_EXTENSION_TYPE_KEY, "paimon.type.varchar"}};
{BlobDefs::kExtensionTypeKey, "paimon.type.varchar"}};
auto wrong_metadata = std::make_shared<arrow::KeyValueMetadata>(wrong_metadata_map);
EXPECT_FALSE(BlobUtils::IsBlobMetadata(wrong_metadata));
std::unordered_map<std::string, std::string> no_extension_metadata_map = {
{"other_key", BLOB_EXTENSION_TYPE_VALUE}};
{"other_key", BlobDefs::kExtensionTypeValue}};
auto no_extension_metadata =
std::make_shared<arrow::KeyValueMetadata>(no_extension_metadata_map);
EXPECT_FALSE(BlobUtils::IsBlobMetadata(no_extension_metadata));
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,12 @@ const char Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY[] = "data-file.external-pa
const char Options::DATA_FILE_PREFIX[] = "data-file.prefix";
const char Options::INDEX_FILE_IN_DATA_FILE_DIR[] = "index-file-in-data-file-dir";
const char Options::ROW_TRACKING_ENABLED[] = "row-tracking.enabled";
const char Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT[] =
"row-tracking.partition-group-on-commit";
const char Options::DATA_EVOLUTION_ENABLED[] = "data-evolution.enabled";
const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name";
const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor";
const char Options::BLOB_FIELD[] = "blob-field";
const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
const char Options::GLOBAL_INDEX_THREAD_NUM[] = "global-index.thread-num";
const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path";
Expand Down
51 changes: 30 additions & 21 deletions src/paimon/core/append/append_only_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "paimon/core/io/data_file_path_factory.h"
#include "paimon/core/io/data_file_writer.h"
#include "paimon/core/io/data_increment.h"
#include "paimon/core/io/multiple_blob_file_writer.h"
#include "paimon/core/io/rolling_blob_file_writer.h"
#include "paimon/core/io/rolling_file_writer.h"
#include "paimon/core/io/single_file_writer.h"
Expand Down Expand Up @@ -212,35 +213,43 @@ AppendOnlyWriter::SingleFileWriterCreator AppendOnlyWriter::GetBlobFileWriterCre

AppendOnlyWriter::RollingFileWriterResult AppendOnlyWriter::CreateRollingBlobWriter(
const BlobUtils::SeparatedSchemas& schemas) const {
if (schemas.blob_schema->num_fields() > RollingBlobFileWriter::EXPECTED_BLOB_FIELD_COUNT) {
return Status::Invalid("Limit exactly one blob field in one paimon table yet.");
}
// use a specialized writer that writes blob data to a separate rolling file.
::ArrowSchema arrow_schema;
ScopeGuard guard([&arrow_schema]() { ArrowSchemaRelease(&arrow_schema); });
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schemas.blob_schema, &arrow_schema));
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileFormat> format,
FileFormatFactory::Get("blob", options_.ToMap()));
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<WriterBuilder> writer_builder,
format->CreateWriterBuilder(&arrow_schema, options_.GetWriteBatchSize()));
writer_builder->WithMemoryPool(memory_pool_);
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schemas.blob_schema, &arrow_schema));
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FormatStatsExtractor> stats_extractor,
format->CreateStatsExtractor(&arrow_schema));

auto single_blob_file_writer_creator = GetBlobFileWriterCreator(
writer_builder, stats_extractor, schemas.blob_schema->field_names());
auto rolling_blob_file_writer_creator = [this, single_blob_file_writer_creator]()
// Multiple blob fields are supported. Each blob field gets its own rolling file writer
// via MultipleBlobFileWriter.
auto blob_schema = schemas.blob_schema;
auto blob_writer_creator = [this, blob_schema](const std::string& blob_field_name)
-> Result<
std::unique_ptr<RollingFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>>> {
// Create a single-field schema for this blob field
auto field = blob_schema->GetFieldByName(blob_field_name);
if (!field) {
return Status::Invalid(
fmt::format("Blob field '{}' not found in blob schema", blob_field_name));
}
auto single_field_schema = arrow::schema({field});
::ArrowSchema arrow_schema;
ScopeGuard guard([&arrow_schema]() { ArrowSchemaRelease(&arrow_schema); });
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*single_field_schema, &arrow_schema));
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<FileFormat> format,
FileFormatFactory::Get("blob", options_.ToMap()));
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<WriterBuilder> writer_builder,
format->CreateWriterBuilder(&arrow_schema, options_.GetWriteBatchSize()));
writer_builder->WithMemoryPool(memory_pool_);
PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*single_field_schema, &arrow_schema));
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FormatStatsExtractor> stats_extractor,
format->CreateStatsExtractor(&arrow_schema));

std::vector<std::string> write_cols = {blob_field_name};
auto single_blob_file_writer_creator =
GetBlobFileWriterCreator(writer_builder, stats_extractor, write_cols);
return std::make_unique<RollingFileWriter<::ArrowArray*, std::shared_ptr<DataFileMeta>>>(
options_.GetBlobTargetFileSize(), single_blob_file_writer_creator);
};

return std::make_unique<RollingBlobFileWriter>(
options_.GetTargetFileSize(/*has_primary_key=*/false),
GetDataFileWriterCreator(schemas.main_schema, schemas.main_schema->field_names()),
rolling_blob_file_writer_creator, arrow::struct_(write_schema_->fields()));
blob_schema, blob_writer_creator, arrow::struct_(write_schema_->fields()));
}

Status AppendOnlyWriter::Sync() {
Expand Down
18 changes: 14 additions & 4 deletions src/paimon/core/append/append_only_writer_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ TEST_F(AppendOnlyWriterTest, TestWriteWithSingleBlobField) {
ASSERT_OK(writer.Close());
}

TEST_F(AppendOnlyWriterTest, TestWriteWithMultipleBlobFieldsShouldFail) {
TEST_F(AppendOnlyWriterTest, TestWriteWithMultipleBlobFields) {
auto options =
CreateOptions({{Options::FILE_FORMAT, "orc"}, {Options::MANIFEST_FORMAT, "orc"}});
auto dir = UniqueTestDirectory::Create();
Expand All @@ -663,9 +663,19 @@ TEST_F(AppendOnlyWriterTest, TestWriteWithMultipleBlobFieldsShouldFail) {
ASSERT_TRUE(blob_builder2.Append("b", 1).ok());
auto blob_array2 = blob_builder2.Finish().ValueOrDie();

ASSERT_NOK_WITH_MSG(
writer.Write(CreateStructBatch(schema, {int_array, blob_array1, blob_array2})),
"Limit exactly one blob field in one paimon table yet.");
ASSERT_OK(writer.Write(CreateStructBatch(schema, {int_array, blob_array1, blob_array2})));
ASSERT_OK_AND_ASSIGN(CommitIncrement inc, writer.PrepareCommit(/*wait_compaction=*/true));

ASSERT_EQ(inc.GetNewFilesIncrement().NewFiles().size(), 3);
const auto& main_file = inc.GetNewFilesIncrement().NewFiles()[0];
const auto& blob_file1 = inc.GetNewFilesIncrement().NewFiles()[1];
const auto& blob_file2 = inc.GetNewFilesIncrement().NewFiles()[2];
ASSERT_TRUE(
options.GetFileSystem()->Exists(path_factory->ToPath(main_file->file_name)).value());
ASSERT_TRUE(
options.GetFileSystem()->Exists(path_factory->ToPath(blob_file1->file_name)).value());
ASSERT_TRUE(
options.GetFileSystem()->Exists(path_factory->ToPath(blob_file2->file_name)).value());
ASSERT_OK(writer.Close());
}

Expand Down
16 changes: 16 additions & 0 deletions src/paimon/core/core_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ struct CoreOptions::Impl {
ExpireConfig expire_config;
std::vector<std::string> sequence_field;
std::vector<std::string> remove_record_on_sequence_group;
std::vector<std::string> blob_fields;

std::string partition_default_name = "__DEFAULT_PARTITION__";
StartupMode startup_mode = StartupMode::Default();
Expand Down Expand Up @@ -430,6 +431,7 @@ struct CoreOptions::Impl {
bool enable_adaptive_prefetch_strategy = true;
bool index_file_in_data_file_dir = false;
bool row_tracking_enabled = false;
bool row_tracking_partition_group_on_commit = true;
bool data_evolution_enabled = false;
bool legacy_partition_name_enabled = true;
bool global_index_enabled = true;
Expand Down Expand Up @@ -525,11 +527,17 @@ struct CoreOptions::Impl {
// Parse row-tracking.enabled - whether to enable unique row id for append table
PAIMON_RETURN_NOT_OK(
parser.Parse<bool>(Options::ROW_TRACKING_ENABLED, &row_tracking_enabled));
// Parse row-tracking.partition-group-on-commit - whether to group delta files by partition
PAIMON_RETURN_NOT_OK(parser.Parse<bool>(Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT,
&row_tracking_partition_group_on_commit));
// Parse data-evolution.enabled - whether to enable data evolution for row tracking
PAIMON_RETURN_NOT_OK(
parser.Parse<bool>(Options::DATA_EVOLUTION_ENABLED, &data_evolution_enabled));
// Parse bucket-function - bucket function type, default "DEFAULT"
PAIMON_RETURN_NOT_OK(parser.ParseBucketFunctionType(&bucket_function_type));
// Parse blob-field - column names to store as blob type, comma separated
PAIMON_RETURN_NOT_OK(parser.ParseList<std::string>(
Options::BLOB_FIELD, Options::FIELDS_SEPARATOR, &blob_fields));
return Status::OK();
}

Expand Down Expand Up @@ -1279,6 +1287,10 @@ bool CoreOptions::RowTrackingEnabled() const {
return impl_->row_tracking_enabled;
}

bool CoreOptions::RowTrackingPartitionGroupOnCommit() const {
return impl_->row_tracking_partition_group_on_commit;
}

bool CoreOptions::DataEvolutionEnabled() const {
return impl_->data_evolution_enabled;
}
Expand Down Expand Up @@ -1373,6 +1385,10 @@ BucketFunctionType CoreOptions::GetBucketFunctionType() const {
return impl_->bucket_function_type;
}

const std::vector<std::string>& CoreOptions::GetBlobFields() const {
return impl_->blob_fields;
}

int64_t CoreOptions::GetLookupCacheFileRetentionMs() const {
return impl_->lookup_cache_file_retention_ms;
}
Expand Down
3 changes: 3 additions & 0 deletions src/paimon/core/core_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class PAIMON_EXPORT CoreOptions {
bool IndexFileInDataFileDir() const;

bool RowTrackingEnabled() const;
bool RowTrackingPartitionGroupOnCommit() const;
bool DataEvolutionEnabled() const;

bool LegacyPartitionNameEnabled() const;
Expand Down Expand Up @@ -179,6 +180,8 @@ class PAIMON_EXPORT CoreOptions {
BucketFunctionType GetBucketFunctionType() const;
std::optional<int32_t> GetGlobalIndexThreadNum() const;

const std::vector<std::string>& GetBlobFields() const;

const std::map<std::string, std::string>& ToMap() const;

private:
Expand Down
6 changes: 6 additions & 0 deletions src/paimon/core/core_options_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ TEST(CoreOptionsTest, TestDefaultValue) {
ASSERT_EQ(core_options.DataFilePrefix(), "data-");
ASSERT_FALSE(core_options.IndexFileInDataFileDir());
ASSERT_FALSE(core_options.RowTrackingEnabled());
ASSERT_TRUE(core_options.RowTrackingPartitionGroupOnCommit());
ASSERT_FALSE(core_options.DataEvolutionEnabled());
ASSERT_TRUE(core_options.GetBlobFields().empty());
ASSERT_TRUE(core_options.LegacyPartitionNameEnabled());
ASSERT_TRUE(core_options.GlobalIndexEnabled());
ASSERT_EQ(std::nullopt, core_options.GetGlobalIndexExternalPath());
Expand Down Expand Up @@ -209,7 +211,9 @@ TEST(CoreOptionsTest, TestFromMap) {
{Options::DATA_FILE_PREFIX, "test-data-"},
{Options::INDEX_FILE_IN_DATA_FILE_DIR, "true"},
{Options::ROW_TRACKING_ENABLED, "true"},
{Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT, "false"},
{Options::DATA_EVOLUTION_ENABLED, "true"},
{Options::BLOB_FIELD, "blob1,blob2"},
{Options::PARTITION_GENERATE_LEGACY_NAME, "false"},
{Options::GLOBAL_INDEX_ENABLED, "false"},
{Options::GLOBAL_INDEX_THREAD_NUM, "4"},
Expand Down Expand Up @@ -336,7 +340,9 @@ TEST(CoreOptionsTest, TestFromMap) {
ASSERT_EQ(core_options.DataFilePrefix(), "test-data-");
ASSERT_TRUE(core_options.IndexFileInDataFileDir());
ASSERT_TRUE(core_options.RowTrackingEnabled());
ASSERT_FALSE(core_options.RowTrackingPartitionGroupOnCommit());
ASSERT_TRUE(core_options.DataEvolutionEnabled());
ASSERT_EQ(core_options.GetBlobFields(), std::vector<std::string>({"blob1", "blob2"}));
ASSERT_FALSE(core_options.LegacyPartitionNameEnabled());
ASSERT_FALSE(core_options.GlobalIndexEnabled());
ASSERT_EQ(core_options.GetGlobalIndexThreadNum(), 4);
Expand Down
Loading
Loading