diff --git a/include/paimon/defs.h b/include/paimon/defs.h index 1d1b60208..54bbab62a 100644 --- a/include/paimon/defs.h +++ b/include/paimon/defs.h @@ -354,6 +354,11 @@ struct PAIMON_EXPORT Options { /// "row-tracking.enabled" - Whether enable unique row id for append table. Default value is /// "false". static const char ROW_TRACKING_ENABLED[]; + /// "row-tracking.partition-group-on-commit" - When row-tracking is enabled, whether to group + /// new file metas by partition before commit, so that assigned row IDs are contiguous within + /// each partition. This is useful if you want to build global indices on this table. Default + /// value is "true". + static const char ROW_TRACKING_PARTITION_GROUP_ON_COMMIT[]; /// "data-evolution.enabled" - Whether enable data evolution for row tracking table. Default /// value is "false". static const char DATA_EVOLUTION_ENABLED[]; @@ -363,6 +368,14 @@ struct PAIMON_EXPORT Options { /// "blob-as-descriptor" - Read and write blob field using blob descriptor rather than blob /// bytes. Default value is "false". static const char BLOB_AS_DESCRIPTOR[]; + /// "blob-field" - Specifies column names that should be stored as blob type. This is used + /// when you want to treat a BYTES column as a BLOB. Comma-separated field names. + /// Multiple blob fields are supported. + static const char BLOB_FIELD[]; + // TODO(xinyu.lxy): support "blob-descriptor-field" - treat fields as BLOB and store as + // BlobDescriptor + // TODO(xinyu.lxy): support "blob-view-field" - treat fields as BLOB and resolve from upstream + // tables /// "global-index.enabled" - Whether to enable global index for scan. Default value is "true". static const char GLOBAL_INDEX_ENABLED[]; /// "global-index.thread-num" - The maximum number of concurrent scanner for global index. No diff --git a/src/paimon/CMakeLists.txt b/src/paimon/CMakeLists.txt index d693d1ec5..45d7bcb7d 100644 --- a/src/paimon/CMakeLists.txt +++ b/src/paimon/CMakeLists.txt @@ -220,6 +220,7 @@ set(PAIMON_CORE_SRCS core/io/key_value_meta_projection_consumer.cpp core/io/key_value_projection_consumer.cpp core/io/key_value_projection_reader.cpp + core/io/multiple_blob_file_writer.cpp core/io/rolling_blob_file_writer.cpp core/manifest/file_kind.cpp core/manifest/file_source.cpp diff --git a/src/paimon/common/data/blob_defs.h b/src/paimon/common/data/blob_defs.h index ca24d850e..ba595cdfd 100644 --- a/src/paimon/common/data/blob_defs.h +++ b/src/paimon/common/data/blob_defs.h @@ -16,21 +16,45 @@ #pragma once +#include + namespace paimon { +/// Blob file format constants shared between writer and reader. +/// /// A Blob field uses the 'large_binary' type as its underlying physical storage in Apache Arrow /// Schema, and is marked as the Paimon Blob extension type by attaching specific -/// **KeyValueMetadata**. Only one blob field in one paimon table is allowed. -/// -/// To create a Blob field: -/// @code -/// std::unordered_map blob_metadata_map = { -/// {Blob::EXTENSION_TYPE_KEY, Blob::EXTENSION_TYPE_VALUE} -/// }; -/// auto field = arrow::field("my_blob_field", arrow::large_binary(), false, -/// std::make_shared(blob_metadata_map)); -/// @endcode -constexpr char BLOB_EXTENSION_TYPE_KEY[] = "paimon.extension.type"; -constexpr char BLOB_EXTENSION_TYPE_VALUE[] = "paimon.type.blob"; +/// **KeyValueMetadata**. Multiple blob fields in one paimon table are supported. +class BlobDefs { + public: + BlobDefs() = delete; + ~BlobDefs() = delete; + + /// To create a Blob field: + /// @code + /// std::unordered_map blob_metadata_map = { + /// {Blob::EXTENSION_TYPE_KEY, Blob::EXTENSION_TYPE_VALUE} + /// }; + /// auto field = arrow::field("my_blob_field", arrow::large_binary(), false, + /// std::make_shared(blob_metadata_map)); + /// @endcode + /// Metadata key identifying a Paimon Blob extension type field. + static constexpr char kExtensionTypeKey[] = "paimon.extension.type"; + /// Metadata value identifying a Paimon Blob extension type field. + static constexpr char kExtensionTypeValue[] = "paimon.type.blob"; + + /// A bin_length value of -1 in the index indicates a null blob entry. + static constexpr int64_t kNullBinLength = -1; + /// Blob file format version. + static constexpr int8_t kFileVersion = 1; + /// Magic number identifying the start of each blob bin. + static constexpr int32_t kMagicNumber = 1481511375; + /// Offset from the start of a bin to the actual blob content (magic number size). + static constexpr int32_t kContentStartOffset = 4; + /// Total metadata length per bin: magic(4) + bin_length(8) + crc32(4) = 16. + static constexpr int32_t kTotalMetaLength = 16; + /// Blob file header length: index_len(4) + version(1) = 5. + static constexpr uint32_t kBlobFileHeaderLength = 5; +}; } // namespace paimon diff --git a/src/paimon/common/data/blob_utils.cpp b/src/paimon/common/data/blob_utils.cpp index 50015b704..84835071d 100644 --- a/src/paimon/common/data/blob_utils.cpp +++ b/src/paimon/common/data/blob_utils.cpp @@ -96,11 +96,11 @@ bool BlobUtils::IsBlobMetadata(const std::shared_ptrGet(BLOB_EXTENSION_TYPE_KEY); + auto extension_name = metadata->Get(BlobDefs::kExtensionTypeKey); if (!extension_name.ok()) { return false; } - return extension_name.ValueUnsafe() == BLOB_EXTENSION_TYPE_VALUE; + return extension_name.ValueUnsafe() == BlobDefs::kExtensionTypeValue; } bool BlobUtils::IsBlobFile(const std::string& file_name) { @@ -110,7 +110,7 @@ bool BlobUtils::IsBlobFile(const std::string& file_name) { std::shared_ptr BlobUtils::ToArrowField( const std::string& field_name, bool nullable, std::unordered_map metadata) { - metadata[BLOB_EXTENSION_TYPE_KEY] = BLOB_EXTENSION_TYPE_VALUE; + metadata[BlobDefs::kExtensionTypeKey] = BlobDefs::kExtensionTypeValue; return arrow::field(field_name, arrow::large_binary(), nullable, std::make_shared(metadata)); } diff --git a/src/paimon/common/data/blob_utils_test.cpp b/src/paimon/common/data/blob_utils_test.cpp index de128cef1..f8835379d 100644 --- a/src/paimon/common/data/blob_utils_test.cpp +++ b/src/paimon/common/data/blob_utils_test.cpp @@ -29,7 +29,7 @@ class BlobUtilsTest : public ::testing::Test { private: std::shared_ptr CreateBlobMetadata() { std::unordered_map blob_metadata_map = { - {BLOB_EXTENSION_TYPE_KEY, BLOB_EXTENSION_TYPE_VALUE}}; + {BlobDefs::kExtensionTypeKey, BlobDefs::kExtensionTypeValue}}; return std::make_shared(blob_metadata_map); } }; @@ -39,11 +39,11 @@ TEST_F(BlobUtilsTest, IsBlobMetadata) { EXPECT_TRUE(BlobUtils::IsBlobMetadata(correct_metadata)); EXPECT_FALSE(BlobUtils::IsBlobMetadata(nullptr)); std::unordered_map wrong_metadata_map = { - {BLOB_EXTENSION_TYPE_KEY, "paimon.type.varchar"}}; + {BlobDefs::kExtensionTypeKey, "paimon.type.varchar"}}; auto wrong_metadata = std::make_shared(wrong_metadata_map); EXPECT_FALSE(BlobUtils::IsBlobMetadata(wrong_metadata)); std::unordered_map no_extension_metadata_map = { - {"other_key", BLOB_EXTENSION_TYPE_VALUE}}; + {"other_key", BlobDefs::kExtensionTypeValue}}; auto no_extension_metadata = std::make_shared(no_extension_metadata_map); EXPECT_FALSE(BlobUtils::IsBlobMetadata(no_extension_metadata)); diff --git a/src/paimon/common/defs.cpp b/src/paimon/common/defs.cpp index 4fedb7da0..7a75c549d 100644 --- a/src/paimon/common/defs.cpp +++ b/src/paimon/common/defs.cpp @@ -87,9 +87,12 @@ const char Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY[] = "data-file.external-pa const char Options::DATA_FILE_PREFIX[] = "data-file.prefix"; const char Options::INDEX_FILE_IN_DATA_FILE_DIR[] = "index-file-in-data-file-dir"; const char Options::ROW_TRACKING_ENABLED[] = "row-tracking.enabled"; +const char Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT[] = + "row-tracking.partition-group-on-commit"; const char Options::DATA_EVOLUTION_ENABLED[] = "data-evolution.enabled"; const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name"; const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor"; +const char Options::BLOB_FIELD[] = "blob-field"; const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled"; const char Options::GLOBAL_INDEX_THREAD_NUM[] = "global-index.thread-num"; const char Options::GLOBAL_INDEX_EXTERNAL_PATH[] = "global-index.external-path"; diff --git a/src/paimon/core/append/append_only_writer.cpp b/src/paimon/core/append/append_only_writer.cpp index 411d2246e..a8ab13ea2 100644 --- a/src/paimon/core/append/append_only_writer.cpp +++ b/src/paimon/core/append/append_only_writer.cpp @@ -33,6 +33,7 @@ #include "paimon/core/io/data_file_path_factory.h" #include "paimon/core/io/data_file_writer.h" #include "paimon/core/io/data_increment.h" +#include "paimon/core/io/multiple_blob_file_writer.h" #include "paimon/core/io/rolling_blob_file_writer.h" #include "paimon/core/io/rolling_file_writer.h" #include "paimon/core/io/single_file_writer.h" @@ -212,35 +213,43 @@ AppendOnlyWriter::SingleFileWriterCreator AppendOnlyWriter::GetBlobFileWriterCre AppendOnlyWriter::RollingFileWriterResult AppendOnlyWriter::CreateRollingBlobWriter( const BlobUtils::SeparatedSchemas& schemas) const { - if (schemas.blob_schema->num_fields() > RollingBlobFileWriter::EXPECTED_BLOB_FIELD_COUNT) { - return Status::Invalid("Limit exactly one blob field in one paimon table yet."); - } - // use a specialized writer that writes blob data to a separate rolling file. - ::ArrowSchema arrow_schema; - ScopeGuard guard([&arrow_schema]() { ArrowSchemaRelease(&arrow_schema); }); - PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schemas.blob_schema, &arrow_schema)); - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr format, - FileFormatFactory::Get("blob", options_.ToMap())); - PAIMON_ASSIGN_OR_RAISE( - std::shared_ptr writer_builder, - format->CreateWriterBuilder(&arrow_schema, options_.GetWriteBatchSize())); - writer_builder->WithMemoryPool(memory_pool_); - PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schemas.blob_schema, &arrow_schema)); - PAIMON_ASSIGN_OR_RAISE(std::shared_ptr stats_extractor, - format->CreateStatsExtractor(&arrow_schema)); - - auto single_blob_file_writer_creator = GetBlobFileWriterCreator( - writer_builder, stats_extractor, schemas.blob_schema->field_names()); - auto rolling_blob_file_writer_creator = [this, single_blob_file_writer_creator]() + // Multiple blob fields are supported. Each blob field gets its own rolling file writer + // via MultipleBlobFileWriter. + auto blob_schema = schemas.blob_schema; + auto blob_writer_creator = [this, blob_schema](const std::string& blob_field_name) -> Result< std::unique_ptr>>> { + // Create a single-field schema for this blob field + auto field = blob_schema->GetFieldByName(blob_field_name); + if (!field) { + return Status::Invalid( + fmt::format("Blob field '{}' not found in blob schema", blob_field_name)); + } + auto single_field_schema = arrow::schema({field}); + ::ArrowSchema arrow_schema; + ScopeGuard guard([&arrow_schema]() { ArrowSchemaRelease(&arrow_schema); }); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*single_field_schema, &arrow_schema)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr format, + FileFormatFactory::Get("blob", options_.ToMap())); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr writer_builder, + format->CreateWriterBuilder(&arrow_schema, options_.GetWriteBatchSize())); + writer_builder->WithMemoryPool(memory_pool_); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*single_field_schema, &arrow_schema)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr stats_extractor, + format->CreateStatsExtractor(&arrow_schema)); + + std::vector write_cols = {blob_field_name}; + auto single_blob_file_writer_creator = + GetBlobFileWriterCreator(writer_builder, stats_extractor, write_cols); return std::make_unique>>( options_.GetBlobTargetFileSize(), single_blob_file_writer_creator); }; + return std::make_unique( options_.GetTargetFileSize(/*has_primary_key=*/false), GetDataFileWriterCreator(schemas.main_schema, schemas.main_schema->field_names()), - rolling_blob_file_writer_creator, arrow::struct_(write_schema_->fields())); + blob_schema, blob_writer_creator, arrow::struct_(write_schema_->fields())); } Status AppendOnlyWriter::Sync() { diff --git a/src/paimon/core/append/append_only_writer_test.cpp b/src/paimon/core/append/append_only_writer_test.cpp index 866a941b8..5d63f98f4 100644 --- a/src/paimon/core/append/append_only_writer_test.cpp +++ b/src/paimon/core/append/append_only_writer_test.cpp @@ -639,7 +639,7 @@ TEST_F(AppendOnlyWriterTest, TestWriteWithSingleBlobField) { ASSERT_OK(writer.Close()); } -TEST_F(AppendOnlyWriterTest, TestWriteWithMultipleBlobFieldsShouldFail) { +TEST_F(AppendOnlyWriterTest, TestWriteWithMultipleBlobFields) { auto options = CreateOptions({{Options::FILE_FORMAT, "orc"}, {Options::MANIFEST_FORMAT, "orc"}}); auto dir = UniqueTestDirectory::Create(); @@ -663,9 +663,19 @@ TEST_F(AppendOnlyWriterTest, TestWriteWithMultipleBlobFieldsShouldFail) { ASSERT_TRUE(blob_builder2.Append("b", 1).ok()); auto blob_array2 = blob_builder2.Finish().ValueOrDie(); - ASSERT_NOK_WITH_MSG( - writer.Write(CreateStructBatch(schema, {int_array, blob_array1, blob_array2})), - "Limit exactly one blob field in one paimon table yet."); + ASSERT_OK(writer.Write(CreateStructBatch(schema, {int_array, blob_array1, blob_array2}))); + ASSERT_OK_AND_ASSIGN(CommitIncrement inc, writer.PrepareCommit(/*wait_compaction=*/true)); + + ASSERT_EQ(inc.GetNewFilesIncrement().NewFiles().size(), 3); + const auto& main_file = inc.GetNewFilesIncrement().NewFiles()[0]; + const auto& blob_file1 = inc.GetNewFilesIncrement().NewFiles()[1]; + const auto& blob_file2 = inc.GetNewFilesIncrement().NewFiles()[2]; + ASSERT_TRUE( + options.GetFileSystem()->Exists(path_factory->ToPath(main_file->file_name)).value()); + ASSERT_TRUE( + options.GetFileSystem()->Exists(path_factory->ToPath(blob_file1->file_name)).value()); + ASSERT_TRUE( + options.GetFileSystem()->Exists(path_factory->ToPath(blob_file2->file_name)).value()); ASSERT_OK(writer.Close()); } diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp index 3c96aeae9..c46bf98a6 100644 --- a/src/paimon/core/core_options.cpp +++ b/src/paimon/core/core_options.cpp @@ -374,6 +374,7 @@ struct CoreOptions::Impl { ExpireConfig expire_config; std::vector sequence_field; std::vector remove_record_on_sequence_group; + std::vector blob_fields; std::string partition_default_name = "__DEFAULT_PARTITION__"; StartupMode startup_mode = StartupMode::Default(); @@ -430,6 +431,7 @@ struct CoreOptions::Impl { bool enable_adaptive_prefetch_strategy = true; bool index_file_in_data_file_dir = false; bool row_tracking_enabled = false; + bool row_tracking_partition_group_on_commit = true; bool data_evolution_enabled = false; bool legacy_partition_name_enabled = true; bool global_index_enabled = true; @@ -525,11 +527,17 @@ struct CoreOptions::Impl { // Parse row-tracking.enabled - whether to enable unique row id for append table PAIMON_RETURN_NOT_OK( parser.Parse(Options::ROW_TRACKING_ENABLED, &row_tracking_enabled)); + // Parse row-tracking.partition-group-on-commit - whether to group delta files by partition + PAIMON_RETURN_NOT_OK(parser.Parse(Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT, + &row_tracking_partition_group_on_commit)); // Parse data-evolution.enabled - whether to enable data evolution for row tracking PAIMON_RETURN_NOT_OK( parser.Parse(Options::DATA_EVOLUTION_ENABLED, &data_evolution_enabled)); // Parse bucket-function - bucket function type, default "DEFAULT" PAIMON_RETURN_NOT_OK(parser.ParseBucketFunctionType(&bucket_function_type)); + // Parse blob-field - column names to store as blob type, comma separated + PAIMON_RETURN_NOT_OK(parser.ParseList( + Options::BLOB_FIELD, Options::FIELDS_SEPARATOR, &blob_fields)); return Status::OK(); } @@ -1279,6 +1287,10 @@ bool CoreOptions::RowTrackingEnabled() const { return impl_->row_tracking_enabled; } +bool CoreOptions::RowTrackingPartitionGroupOnCommit() const { + return impl_->row_tracking_partition_group_on_commit; +} + bool CoreOptions::DataEvolutionEnabled() const { return impl_->data_evolution_enabled; } @@ -1373,6 +1385,10 @@ BucketFunctionType CoreOptions::GetBucketFunctionType() const { return impl_->bucket_function_type; } +const std::vector& CoreOptions::GetBlobFields() const { + return impl_->blob_fields; +} + int64_t CoreOptions::GetLookupCacheFileRetentionMs() const { return impl_->lookup_cache_file_retention_ms; } diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h index 81235ebb7..3f96c9c9f 100644 --- a/src/paimon/core/core_options.h +++ b/src/paimon/core/core_options.h @@ -144,6 +144,7 @@ class PAIMON_EXPORT CoreOptions { bool IndexFileInDataFileDir() const; bool RowTrackingEnabled() const; + bool RowTrackingPartitionGroupOnCommit() const; bool DataEvolutionEnabled() const; bool LegacyPartitionNameEnabled() const; @@ -179,6 +180,8 @@ class PAIMON_EXPORT CoreOptions { BucketFunctionType GetBucketFunctionType() const; std::optional GetGlobalIndexThreadNum() const; + const std::vector& GetBlobFields() const; + const std::map& ToMap() const; private: diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp index 0a3466547..be2fab72e 100644 --- a/src/paimon/core/core_options_test.cpp +++ b/src/paimon/core/core_options_test.cpp @@ -111,7 +111,9 @@ TEST(CoreOptionsTest, TestDefaultValue) { ASSERT_EQ(core_options.DataFilePrefix(), "data-"); ASSERT_FALSE(core_options.IndexFileInDataFileDir()); ASSERT_FALSE(core_options.RowTrackingEnabled()); + ASSERT_TRUE(core_options.RowTrackingPartitionGroupOnCommit()); ASSERT_FALSE(core_options.DataEvolutionEnabled()); + ASSERT_TRUE(core_options.GetBlobFields().empty()); ASSERT_TRUE(core_options.LegacyPartitionNameEnabled()); ASSERT_TRUE(core_options.GlobalIndexEnabled()); ASSERT_EQ(std::nullopt, core_options.GetGlobalIndexExternalPath()); @@ -209,7 +211,9 @@ TEST(CoreOptionsTest, TestFromMap) { {Options::DATA_FILE_PREFIX, "test-data-"}, {Options::INDEX_FILE_IN_DATA_FILE_DIR, "true"}, {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT, "false"}, {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_FIELD, "blob1,blob2"}, {Options::PARTITION_GENERATE_LEGACY_NAME, "false"}, {Options::GLOBAL_INDEX_ENABLED, "false"}, {Options::GLOBAL_INDEX_THREAD_NUM, "4"}, @@ -336,7 +340,9 @@ TEST(CoreOptionsTest, TestFromMap) { ASSERT_EQ(core_options.DataFilePrefix(), "test-data-"); ASSERT_TRUE(core_options.IndexFileInDataFileDir()); ASSERT_TRUE(core_options.RowTrackingEnabled()); + ASSERT_FALSE(core_options.RowTrackingPartitionGroupOnCommit()); ASSERT_TRUE(core_options.DataEvolutionEnabled()); + ASSERT_EQ(core_options.GetBlobFields(), std::vector({"blob1", "blob2"})); ASSERT_FALSE(core_options.LegacyPartitionNameEnabled()); ASSERT_FALSE(core_options.GlobalIndexEnabled()); ASSERT_EQ(core_options.GetGlobalIndexThreadNum(), 4); diff --git a/src/paimon/core/io/multiple_blob_file_writer.cpp b/src/paimon/core/io/multiple_blob_file_writer.cpp new file mode 100644 index 000000000..868f5174d --- /dev/null +++ b/src/paimon/core/io/multiple_blob_file_writer.cpp @@ -0,0 +1,118 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/io/multiple_blob_file_writer.h" + +#include +#include +#include + +#include "arrow/array/array_nested.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "arrow/type.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/macros.h" + +namespace paimon { + +MultipleBlobFileWriter::MultipleBlobFileWriter(const std::shared_ptr& blob_schema, + BlobWriterCreator blob_writer_creator) + : blob_schema_(blob_schema), + blob_writer_creator_(std::move(blob_writer_creator)), + logger_(Logger::GetLogger("MultipleBlobFileWriter")) {} + +Status MultipleBlobFileWriter::Write(::ArrowArray* record) { + // Lazily initialize per-field blob writers on first write + if (blob_field_writers_.empty()) { + for (int32_t i = 0; i < blob_schema_->num_fields(); ++i) { + const std::string& field_name = blob_schema_->field(i)->name(); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr writer, + blob_writer_creator_(field_name)); + blob_field_writers_.push_back(BlobFieldWriter{field_name, i, std::move(writer)}); + } + } + + // Import the ArrowArray as a StructArray containing all blob fields + std::shared_ptr data_type = arrow::struct_(blob_schema_->fields()); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_array, + arrow::ImportArray(record, data_type)); + std::shared_ptr struct_array = + std::dynamic_pointer_cast(arrow_array); + if (!struct_array) { + return Status::Invalid("MultipleBlobFileWriter: input is not a StructArray"); + } + + // TODO(xinyu.lxy): support write parallel + // For each blob field, extract the column and write row by row to its dedicated writer + for (BlobFieldWriter& field_writer : blob_field_writers_) { + std::shared_ptr field_array = struct_array->field(field_writer.field_index); + // Create a single-field StructArray for each row and write to the blob writer + for (int64_t row = 0; row < field_array->length(); ++row) { + std::shared_ptr slice = field_array->Slice(row, 1); + // Wrap single field into a StructArray with the same field name + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr single_field_struct, + arrow::StructArray::Make({slice}, {field_writer.field_name})); + ::ArrowArray c_blob_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW( + arrow::ExportArray(*single_field_struct, &c_blob_array)); + ScopeGuard guard([&c_blob_array]() { ArrowArrayRelease(&c_blob_array); }); + PAIMON_RETURN_NOT_OK(field_writer.writer->Write(&c_blob_array)); + } + } + + return Status::OK(); +} + +void MultipleBlobFileWriter::Abort() { + for (auto& field_writer : blob_field_writers_) { + if (field_writer.writer) { + field_writer.writer->Abort(); + } + } + blob_field_writers_.clear(); +} + +Status MultipleBlobFileWriter::Close() { + if (closed_) { + return Status::OK(); + } + for (auto& field_writer : blob_field_writers_) { + if (field_writer.writer) { + PAIMON_RETURN_NOT_OK(field_writer.writer->Close()); + } + } + closed_ = true; + return Status::OK(); +} + +Result>> MultipleBlobFileWriter::GetResult() { + std::vector> all_results; + for (BlobFieldWriter& field_writer : blob_field_writers_) { + if (field_writer.writer) { + PAIMON_ASSIGN_OR_RAISE(std::vector> results, + field_writer.writer->GetResult()); + all_results.insert(all_results.end(), results.begin(), results.end()); + } + } + blob_field_writers_.clear(); + return all_results; +} + +} // namespace paimon diff --git a/src/paimon/core/io/multiple_blob_file_writer.h b/src/paimon/core/io/multiple_blob_file_writer.h new file mode 100644 index 000000000..d84ebcfaf --- /dev/null +++ b/src/paimon/core/io/multiple_blob_file_writer.h @@ -0,0 +1,94 @@ +/* + * Copyright 2026-present Alibaba Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/rolling_file_writer.h" +#include "paimon/core/io/single_file_writer.h" +#include "paimon/logging.h" +#include "paimon/result.h" +#include "paimon/status.h" + +struct ArrowArray; + +namespace arrow { +class Schema; +class StructArray; +} // namespace arrow + +namespace paimon { + +/// A blob file writer that manages multiple blob fields, each written to separate rolling files. +/// +/// For each blob field in the schema, a dedicated RollingFileWriter is created. When writing a +/// row, the writer projects out each blob field and writes it to the corresponding blob file +/// independently. +/// +/// This design supports multiple blob columns in a single table, where each blob column produces +/// its own set of blob files that are rolled independently based on target file size. +class MultipleBlobFileWriter { + public: + using BlobRollingWriter = RollingFileWriter<::ArrowArray*, std::shared_ptr>; + using BlobWriterCreator = std::function>( + const std::string& blob_field_name)>; + + /// Constructs a MultipleBlobFileWriter. + /// @param blob_schema The schema containing only blob fields. + /// @param blob_writer_creator Factory function to create a RollingFileWriter for each blob + /// field. + MultipleBlobFileWriter(const std::shared_ptr& blob_schema, + BlobWriterCreator blob_writer_creator); + + ~MultipleBlobFileWriter() = default; + + /// Writes a batch of blob data. The input ArrowArray should contain all blob fields as a + /// StructArray. Each blob field is extracted and written to its dedicated rolling file writer + /// row by row. + Status Write(::ArrowArray* record); + + /// Aborts all blob writers and releases resources. + void Abort(); + + /// Closes all blob writers. + Status Close(); + + /// Returns the results (DataFileMeta) from all blob writers. + Result>> GetResult(); + + private: + /// Internal per-field blob writer. + struct BlobFieldWriter { + std::string field_name; + int32_t field_index; + std::unique_ptr writer; + }; + + std::shared_ptr blob_schema_; + BlobWriterCreator blob_writer_creator_; + std::vector blob_field_writers_; + bool closed_ = false; + + std::unique_ptr logger_; +}; + +} // namespace paimon diff --git a/src/paimon/core/io/rolling_blob_file_writer.cpp b/src/paimon/core/io/rolling_blob_file_writer.cpp index 9f10ec518..9019e7392 100644 --- a/src/paimon/core/io/rolling_blob_file_writer.cpp +++ b/src/paimon/core/io/rolling_blob_file_writer.cpp @@ -42,11 +42,13 @@ namespace paimon { RollingBlobFileWriter::RollingBlobFileWriter( int64_t target_file_size, std::function>()> create_file_writer, - std::function>()> create_blob_file_writer, + const std::shared_ptr& blob_schema, + MultipleBlobFileWriter::BlobWriterCreator blob_writer_creator, const std::shared_ptr& data_type) : RollingFileWriter<::ArrowArray*, std::shared_ptr>(target_file_size, create_file_writer), - create_blob_file_writer_(create_blob_file_writer), + blob_schema_(blob_schema), + blob_writer_creator_(std::move(blob_writer_creator)), data_type_(data_type), logger_(Logger::GetLogger("RollingBlobFileWriter")) {} @@ -57,7 +59,7 @@ Status RollingBlobFileWriter::Write(::ArrowArray* record) { PAIMON_RETURN_NOT_OK(OpenCurrentWriter()); } if (PAIMON_UNLIKELY(blob_writer_ == nullptr)) { - PAIMON_ASSIGN_OR_RAISE(blob_writer_, create_blob_file_writer_()); + blob_writer_ = std::make_unique(blob_schema_, blob_writer_creator_); } int64_t record_count = record->length; PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_array, @@ -66,20 +68,21 @@ Status RollingBlobFileWriter::Write(::ArrowArray* record) { PAIMON_ASSIGN_OR_RAISE(BlobUtils::SeparatedStructArrays separated_arrays, BlobUtils::SeparateBlobArray(struct_array)); + // Write main (non-blob) data ::ArrowArray c_main_array; PAIMON_RETURN_NOT_OK_FROM_ARROW( arrow::ExportArray(*separated_arrays.main_array, &c_main_array)); ScopeGuard array_lifecycle_guard( [&c_main_array]() -> void { ArrowArrayRelease(&c_main_array); }); PAIMON_RETURN_NOT_OK(current_writer_->Write(&c_main_array)); - for (auto i = 0; i < separated_arrays.blob_array->length(); i++) { - std::shared_ptr slice_array = separated_arrays.blob_array->Slice(i, 1); - ::ArrowArray c_blob_array; - ScopeGuard array_lifecycle_guard( - [&c_blob_array]() -> void { ArrowArrayRelease(&c_blob_array); }); - PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*slice_array, &c_blob_array)); - PAIMON_RETURN_NOT_OK(blob_writer_->Write(&c_blob_array)); - } + + // Write blob data via MultipleBlobFileWriter (each blob field independently) + ::ArrowArray c_blob_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW( + arrow::ExportArray(*separated_arrays.blob_array, &c_blob_array)); + ScopeGuard blob_array_guard([&c_blob_array]() -> void { ArrowArrayRelease(&c_blob_array); }); + PAIMON_RETURN_NOT_OK(blob_writer_->Write(&c_blob_array)); + record_count_ += record_count; PAIMON_ASSIGN_OR_RAISE(bool need_rolling_file, NeedRollingFile()); if (need_rolling_file) { @@ -99,7 +102,8 @@ Status RollingBlobFileWriter::CloseCurrentWriter() { PAIMON_ASSIGN_OR_RAISE(std::shared_ptr main_data_file_meta, CloseMainWriter()); PAIMON_ASSIGN_OR_RAISE(std::vector> blob_metas, CloseBlobWriter()); - PAIMON_RETURN_NOT_OK(ValidateFileConsistency(main_data_file_meta, blob_metas)); + PAIMON_RETURN_NOT_OK( + ValidateFileConsistency(main_data_file_meta, blob_metas, blob_schema_->num_fields())); results_.push_back(main_data_file_meta); results_.insert(results_.end(), blob_metas.begin(), blob_metas.end()); @@ -126,28 +130,35 @@ Result>> RollingBlobFileWriter::CloseB PAIMON_RETURN_NOT_OK(blob_writer_->Close()); PAIMON_ASSIGN_OR_RAISE(std::vector> results, blob_writer_->GetResult()); - blob_writer_ = nullptr; + blob_writer_.reset(); return results; } Status RollingBlobFileWriter::ValidateFileConsistency( const std::shared_ptr& main_data_file_meta, - const std::vector>& blob_tagged_metas) { + const std::vector>& blob_tagged_metas, int32_t blob_field_count) { + if (blob_tagged_metas.empty()) { + return Status::OK(); + } + // With multiple blob fields, each blob field produces its own set of files. + // total_blob_row_count should be exactly main_row_count * blob_field_count. int64_t main_row_count = main_data_file_meta->row_count; - int64_t blob_row_count = 0; + int64_t expected_blob_row_count = main_row_count * blob_field_count; + int64_t total_blob_row_count = 0; for (const auto& blob_tagged_meta : blob_tagged_metas) { - blob_row_count += blob_tagged_meta->row_count; + total_blob_row_count += blob_tagged_meta->row_count; } - if (main_row_count != blob_row_count) { + if (total_blob_row_count != expected_blob_row_count) { std::vector blob_file_names; for (const auto& blob_tagged_meta : blob_tagged_metas) { blob_file_names.push_back(blob_tagged_meta->file_name); } - return Status::Invalid( - fmt::format("This is a bug: The row count of main file and blob files does not match. " - "Main file: {} (row count: {}), blob files: {} (total row count: {})", - main_data_file_meta->file_name, main_row_count, - fmt::join(blob_file_names, ", "), blob_row_count)); + return Status::Invalid(fmt::format( + "This is a bug: The row count of main file and blob files does not match. " + "Main file: {} (row count: {}), blob field count: {}, " + "expected blob row count: {}, blob files: {} (actual total row count: {})", + main_data_file_meta->file_name, main_row_count, blob_field_count, + expected_blob_row_count, fmt::join(blob_file_names, ", "), total_blob_row_count)); } return Status::OK(); } diff --git a/src/paimon/core/io/rolling_blob_file_writer.h b/src/paimon/core/io/rolling_blob_file_writer.h index 96b9003c7..169fbbd70 100644 --- a/src/paimon/core/io/rolling_blob_file_writer.h +++ b/src/paimon/core/io/rolling_blob_file_writer.h @@ -25,6 +25,7 @@ #include "arrow/c/bridge.h" #include "arrow/result.h" #include "paimon/common/metrics/metrics_impl.h" +#include "paimon/core/io/multiple_blob_file_writer.h" #include "paimon/core/io/rolling_file_writer.h" #include "paimon/metrics.h" #include "paimon/record_batch.h" @@ -35,34 +36,33 @@ namespace paimon { /// files for normal columns and blob columns, managing their lifecycle and ensuring consistency /// between them. /// +/// Multiple blob fields are supported. Each blob field is written to its own set of blob files +/// independently via MultipleBlobFileWriter. +/// ///
 /// For example,
-/// given a table schema with normal columns (id INT, name STRING) and a blob column (data BLOB),
-/// this writer will create separate files for (id, name) and (data).
+/// given a table schema with normal columns (id INT, name STRING) and blob columns (data1 BLOB,
+/// data2 BLOB), this writer will create separate files for (id, name), (data1), and (data2).
 /// It will roll files based on the specified target file size, ensuring that both normal and blob
 /// files are rolled simultaneously.
 ///
 /// Every time a file is rolled, the writer will close the current normal data file and blob data
 /// files, so one normal data file may correspond to multiple blob data files.
 ///
-/// Normal file1: f1.parquet may including (b1.blob, b2.blob, b3.blob)
-/// Normal file2: f1-2.parquet may including (b4.blob, b5.blob)
+/// Normal file1: f1.parquet may include (blob1_1.blob, blob1_2.blob, blob2_1.blob)
+/// Normal file2: f2.parquet may include (blob1_3.blob, blob2_2.blob)
 ///
 /// 
class RollingBlobFileWriter : public RollingFileWriter<::ArrowArray*, std::shared_ptr> { public: using MainWriter = SingleFileWriter<::ArrowArray*, std::shared_ptr>; - using BlobWriter = RollingFileWriter<::ArrowArray*, std::shared_ptr>; - - // Expected number of blob fields in a table. - static constexpr int32_t EXPECTED_BLOB_FIELD_COUNT = 1; - RollingBlobFileWriter( - int64_t target_file_size, - std::function>()> create_file_writer, - std::function>()> create_blob_file_writer, - const std::shared_ptr& data_type); + RollingBlobFileWriter(int64_t target_file_size, + std::function>()> create_file_writer, + const std::shared_ptr& blob_schema, + MultipleBlobFileWriter::BlobWriterCreator blob_writer_creator, + const std::shared_ptr& data_type); ~RollingBlobFileWriter() override = default; Status Write(::ArrowArray* record) override; @@ -73,15 +73,17 @@ class RollingBlobFileWriter private: static Status ValidateFileConsistency( const std::shared_ptr& main_data_file_meta, - const std::vector>& blob_tagged_metas); + const std::vector>& blob_tagged_metas, + int32_t blob_field_count); Status CloseCurrentWriter(); Result> CloseMainWriter(); Result>> CloseBlobWriter(); - std::function>()> create_blob_file_writer_; - std::unique_ptr blob_writer_; + std::shared_ptr blob_schema_; + MultipleBlobFileWriter::BlobWriterCreator blob_writer_creator_; + std::unique_ptr blob_writer_; std::shared_ptr data_type_; std::unique_ptr logger_; diff --git a/src/paimon/core/io/rolling_blob_file_writer_test.cpp b/src/paimon/core/io/rolling_blob_file_writer_test.cpp index 4a144eeda..1d72563e3 100644 --- a/src/paimon/core/io/rolling_blob_file_writer_test.cpp +++ b/src/paimon/core/io/rolling_blob_file_writer_test.cpp @@ -82,8 +82,10 @@ TEST_F(RollingBlobFileWriterTest, ValidateFileConsistency) { /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, /*first_row_id=*/3, /*write_cols=*/std::vector({"blob"})); - ASSERT_OK(RollingBlobFileWriter::ValidateFileConsistency(file_meta1, {file_meta2, file_meta3})); - ASSERT_NOK_WITH_MSG(RollingBlobFileWriter::ValidateFileConsistency(file_meta1, {file_meta2}), + ASSERT_OK(RollingBlobFileWriter::ValidateFileConsistency(file_meta1, {file_meta2, file_meta3}, + /*blob_field_count=*/1)); + ASSERT_NOK_WITH_MSG(RollingBlobFileWriter::ValidateFileConsistency(file_meta1, {file_meta2}, + /*blob_field_count=*/2), "This is a bug: The row count of main file and blob files does not match."); } diff --git a/src/paimon/core/operation/file_store_commit_impl.cpp b/src/paimon/core/operation/file_store_commit_impl.cpp index c0e545c34..3f263ab19 100644 --- a/src/paimon/core/operation/file_store_commit_impl.cpp +++ b/src/paimon/core/operation/file_store_commit_impl.cpp @@ -31,6 +31,7 @@ #include "paimon/common/data/blob_utils.h" #include "paimon/common/executor/future.h" #include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/table/special_fields.h" #include "paimon/common/utils/binary_row_partition_computer.h" #include "paimon/common/utils/date_time_utils.h" #include "paimon/common/utils/scope_guard.h" @@ -653,6 +654,17 @@ Result FileStoreCommitImpl::TryCommitOnce( PAIMON_ASSIGN_OR_RAISE(base_manifest_list, manifest_list_->Write(merge_after_manifests)); if (options_.RowTrackingEnabled()) { + if (options_.RowTrackingPartitionGroupOnCommit()) { + std::unordered_map> delta_files_by_partition; + for (auto& entry : delta_files) { + delta_files_by_partition[entry.Partition()].push_back(std::move(entry)); + } + delta_files.clear(); + for (auto& [_, entries] : delta_files_by_partition) { + delta_files.insert(delta_files.end(), std::make_move_iterator(entries.begin()), + std::make_move_iterator(entries.end())); + } + } // assigned snapshot id to delta files AssignSnapshotId(new_snapshot_id, &delta_files); // assign row id for new files @@ -753,28 +765,47 @@ Result FileStoreCommitImpl::AssignRowTrackingMeta( } // assign row id for new files int64_t start = first_row_id_start; - int64_t blob_start = first_row_id_start; + int64_t blob_start_default = first_row_id_start; + // Per-blob-field row id tracking: each blob field maintains its own start position, + // keyed by the blob field name (from write_cols[0]). + std::map blob_starts; + // TODO(xinyu.lxy): support vector store file row tracking when vector store is implemented for (auto& entry : *delta_files) { if (entry.File()->file_source == std::nullopt) { return Status::Invalid( "This is a bug, file source field for row-tracking table must present."); } + bool contains_row_id = + entry.File()->write_cols.has_value() && + std::find(entry.File()->write_cols->begin(), entry.File()->write_cols->end(), + SpecialFields::RowId().Name()) != entry.File()->write_cols->end(); if (entry.File()->file_source.value() == FileSource::Append() && - entry.File()->first_row_id == std::nullopt) { + entry.File()->first_row_id == std::nullopt && !contains_row_id) { + int64_t row_count = entry.File()->row_count; if (BlobUtils::IsBlobFile(entry.File()->file_name)) { + // Use the first write_col as the blob field name to support + // independent row tracking per blob field. + std::string blob_field_name; + if (!entry.File()->write_cols || entry.File()->write_cols->empty()) { + return Status::Invalid(fmt::format( + "invalid blob file {}: does not have write_cols", entry.File()->file_name)); + } + blob_field_name = entry.File()->write_cols->at(0); + int64_t blob_start = blob_starts.count(blob_field_name) + ? blob_starts[blob_field_name] + : blob_start_default; if (blob_start >= start) { return Status::Invalid(fmt::format( "This is a bug, blob start {} should be less than start {} when " "assigning a blob entry file.", blob_start, start)); } - int64_t row_count = entry.File()->row_count; entry.AssignFirstRowId(blob_start); - blob_start += row_count; + blob_starts[blob_field_name] = blob_start + row_count; } else { - int64_t row_count = entry.File()->row_count; entry.AssignFirstRowId(start); - blob_start = start; + blob_start_default = start; + blob_starts.clear(); start += row_count; } } diff --git a/src/paimon/core/schema/schema_validation.cpp b/src/paimon/core/schema/schema_validation.cpp index 0c3e28eb4..b8c4bb692 100644 --- a/src/paimon/core/schema/schema_validation.cpp +++ b/src/paimon/core/schema/schema_validation.cpp @@ -118,6 +118,7 @@ Status SchemaValidation::ValidateTableSchema(const TableSchema& schema) { } PAIMON_RETURN_NOT_OK(ValidateRowTracking(schema, options)); + PAIMON_RETURN_NOT_OK(ValidateBlobFields(schema, options)); return Status::OK(); } @@ -410,22 +411,50 @@ Status SchemaValidation::ValidateRowTracking(const TableSchema& table_schema, !options.DeletionVectorsEnabled(), "Data evolution config must disabled with deletion-vectors.enabled")); } - int64_t blob_field_count = 0; - for (const auto& field : table_schema.Fields()) { - if (BlobUtils::IsBlobField(field.ArrowField())) { - blob_field_count += 1; + return Status::OK(); +} + +Status SchemaValidation::ValidateBlobFields(const TableSchema& schema, const CoreOptions& options) { + const auto& configured_blob_names = options.GetBlobFields(); + if (configured_blob_names.empty()) { + return Status::OK(); + } + + PAIMON_ASSIGN_OR_RAISE(std::vector blob_fields, + schema.GetFields(configured_blob_names)); + for (const auto& blob_field : blob_fields) { + if (!BlobUtils::IsBlobField(blob_field.ArrowField())) { + return Status::Invalid( + fmt::format("Field {} in {} must be a BLOB field in table schema.", + blob_field.Name(), fmt::join(configured_blob_names, ", "))); } } - if (blob_field_count > 0) { - PAIMON_RETURN_NOT_OK(Preconditions::CheckState( - options.DataEvolutionEnabled(), - "Data evolution config must be enabled for table with BLOB type column.")); - PAIMON_RETURN_NOT_OK(Preconditions::CheckState( - blob_field_count == 1, "Table with BLOB type column only supports one BLOB column.")); - PAIMON_RETURN_NOT_OK(Preconditions::CheckState( - table_schema.Fields().size() > 1, - "Table with BLOB type column must have other normal columns.")); + + // Validate no duplicate blob field names + PAIMON_RETURN_NOT_OK(ValidateNoDuplicateField(configured_blob_names, "blob field")); + + // Validate blob fields cannot be primary keys or partition keys + for (const auto& blob_field_name : configured_blob_names) { + if (std::find(schema.PrimaryKeys().begin(), schema.PrimaryKeys().end(), blob_field_name) != + schema.PrimaryKeys().end()) { + return Status::Invalid( + fmt::format("Blob field {} cannot be a primary key.", blob_field_name)); + } + if (std::find(schema.PartitionKeys().begin(), schema.PartitionKeys().end(), + blob_field_name) != schema.PartitionKeys().end()) { + return Status::Invalid( + fmt::format("Blob field {} cannot be a partition key.", blob_field_name)); + } } + + // Validate data evolution must be enabled when blob-field is configured + PAIMON_RETURN_NOT_OK(Preconditions::CheckState( + options.DataEvolutionEnabled(), + "Data evolution config must be enabled for table with BLOB type column.")); + PAIMON_RETURN_NOT_OK( + Preconditions::CheckState(schema.Fields().size() > configured_blob_names.size(), + "Table with BLOB type column must have other normal columns.")); + // TODO(xinyu.lxy): validate blob-descriptor-field and blob-view-field return Status::OK(); } diff --git a/src/paimon/core/schema/schema_validation.h b/src/paimon/core/schema/schema_validation.h index a4aa36b60..52c63f91a 100644 --- a/src/paimon/core/schema/schema_validation.h +++ b/src/paimon/core/schema/schema_validation.h @@ -68,6 +68,8 @@ class SchemaValidation { static Status ValidateRowTracking(const TableSchema& table_schema, const CoreOptions& options); + static Status ValidateBlobFields(const TableSchema& schema, const CoreOptions& options); + static bool IsComplexType(const std::shared_ptr& field); }; diff --git a/src/paimon/core/schema/schema_validation_test.cpp b/src/paimon/core/schema/schema_validation_test.cpp index f09c9de36..9b28ff5d5 100644 --- a/src/paimon/core/schema/schema_validation_test.cpp +++ b/src/paimon/core/schema/schema_validation_test.cpp @@ -74,11 +74,24 @@ TEST(SchemaValidationTest, TestWithBlobField) { auto schema = arrow::schema(fields); std::vector primary_keys = {}; std::vector partition_keys = {"f1"}; - std::map options = { - {Options::BUCKET, "-1"}, - {Options::ROW_TRACKING_ENABLED, "true"}, - {Options::DATA_EVOLUTION_ENABLED, "true"}, - }; + std::map options = {{Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_FIELD, "f3"}}; + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); + ASSERT_OK(SchemaValidation::ValidateTableSchema(*table_schema)); + } + { + arrow::FieldVector fields = {f0, f1, f2, f3, f4}; + auto schema = arrow::schema(fields); + std::vector primary_keys = {}; + std::vector partition_keys = {"f1"}; + std::map options = {{Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_FIELD, "f3,f4"}}; ASSERT_OK_AND_ASSIGN( std::shared_ptr table_schema, TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); @@ -89,11 +102,10 @@ TEST(SchemaValidationTest, TestWithBlobField) { auto schema = arrow::schema(fields); std::vector primary_keys = {}; std::vector partition_keys = {"f1"}; - std::map options = { - {Options::BUCKET, "-1"}, - {Options::ROW_TRACKING_ENABLED, "true"}, - {Options::DATA_EVOLUTION_ENABLED, "false"}, - }; + std::map options = {{Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "false"}, + {Options::BLOB_FIELD, "f3"}}; ASSERT_OK_AND_ASSIGN( std::shared_ptr table_schema, TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); @@ -102,36 +114,81 @@ TEST(SchemaValidationTest, TestWithBlobField) { "Data evolution config must be enabled for table with BLOB type column."); } { - arrow::FieldVector fields = {f0, f1, f2, f3, f4}; + arrow::FieldVector fields = {f3}; + auto schema = arrow::schema(fields); + std::vector primary_keys = {}; + std::vector partition_keys = {}; + std::map options = {{Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_FIELD, "f3"}}; + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); + ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema), + "Table with BLOB type column must have other normal columns."); + } + { + arrow::FieldVector fields = {f0, f1, f2, f3}; auto schema = arrow::schema(fields); std::vector primary_keys = {}; std::vector partition_keys = {"f1"}; - std::map options = { - {Options::BUCKET, "-1"}, - {Options::ROW_TRACKING_ENABLED, "true"}, - {Options::DATA_EVOLUTION_ENABLED, "true"}, - }; + std::map options = {{Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_FIELD, "non-exist"}}; ASSERT_OK_AND_ASSIGN( std::shared_ptr table_schema, TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema), - "Table with BLOB type column only supports one BLOB column."); + "Get field non-exist failed: not exist in table schema"); } { - arrow::FieldVector fields = {f3}; + arrow::FieldVector fields = {f0, f1, f2, f3}; auto schema = arrow::schema(fields); std::vector primary_keys = {}; - std::vector partition_keys = {}; - std::map options = { - {Options::BUCKET, "-1"}, - {Options::ROW_TRACKING_ENABLED, "true"}, - {Options::DATA_EVOLUTION_ENABLED, "true"}, - }; + std::vector partition_keys = {"f1"}; + std::map options = {{Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_FIELD, "f3,f0"}}; ASSERT_OK_AND_ASSIGN( std::shared_ptr table_schema, TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateTableSchema(*table_schema), - "Table with BLOB type column must have other normal columns."); + "Field f0 in f3, f0 must be a BLOB field in table schema."); + } + { + arrow::FieldVector fields = {f0, f1, f2, f3}; + auto schema = arrow::schema(fields); + std::vector primary_keys = {}; + std::vector partition_keys = {"f3"}; + std::map options = {{Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_FIELD, "f3"}}; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); + ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateBlobFields(*table_schema, core_options), + "Blob field f3 cannot be a partition key."); + } + { + arrow::FieldVector fields = {f0, f1, f2, f3}; + auto schema = arrow::schema(fields); + std::vector primary_keys = {"f3"}; + std::vector partition_keys = {}; + std::map options = {{Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_FIELD, "f3"}}; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); + ASSERT_NOK_WITH_MSG(SchemaValidation::ValidateBlobFields(*table_schema, core_options), + "Blob field f3 cannot be a primary key."); } } diff --git a/src/paimon/format/blob/blob_file_batch_reader.cpp b/src/paimon/format/blob/blob_file_batch_reader.cpp index d47e41730..d9e082603 100644 --- a/src/paimon/format/blob/blob_file_batch_reader.cpp +++ b/src/paimon/format/blob/blob_file_batch_reader.cpp @@ -24,6 +24,7 @@ #include "arrow/array/builder_dict.h" #include "arrow/array/builder_nested.h" #include "arrow/c/bridge.h" +#include "arrow/util/bit_util.h" #include "fmt/format.h" #include "paimon/common/data/blob_utils.h" #include "paimon/common/executor/future.h" @@ -50,23 +51,25 @@ Result> BlobFileBatchReader::Create( } PAIMON_ASSIGN_OR_RAISE(uint64_t file_size, input_stream->Length()); - PAIMON_RETURN_NOT_OK(input_stream->Seek(file_size - kBlobFileHeaderLength, FS_SEEK_SET)); - int8_t header[kBlobFileHeaderLength]; - PAIMON_ASSIGN_OR_RAISE(int32_t actual_size, input_stream->Read(reinterpret_cast(header), - kBlobFileHeaderLength)); - if (actual_size != kBlobFileHeaderLength) { + PAIMON_RETURN_NOT_OK( + input_stream->Seek(file_size - BlobDefs::kBlobFileHeaderLength, FS_SEEK_SET)); + int8_t header[BlobDefs::kBlobFileHeaderLength]; + PAIMON_ASSIGN_OR_RAISE( + int32_t actual_size, + input_stream->Read(reinterpret_cast(header), BlobDefs::kBlobFileHeaderLength)); + if (actual_size != BlobDefs::kBlobFileHeaderLength) { return Status::Invalid( fmt::format("actual read size {} not match with expect header length {}", actual_size, - kBlobFileHeaderLength)); + BlobDefs::kBlobFileHeaderLength)); } int8_t version = header[4]; - if (version != 1) { + if (version != BlobDefs::kFileVersion) { return Status::Invalid(fmt::format( "create blob format reader failed. unsupported blob file version: {}", version)); } int32_t index_length = GetIndexLength(header, 0); - PAIMON_RETURN_NOT_OK( - input_stream->Seek(file_size - kBlobFileHeaderLength - index_length, FS_SEEK_SET)); + PAIMON_RETURN_NOT_OK(input_stream->Seek( + file_size - BlobDefs::kBlobFileHeaderLength - index_length, FS_SEEK_SET)); std::vector index_bytes(index_length, '\0'); PAIMON_ASSIGN_OR_RAISE(actual_size, input_stream->Read(index_bytes.data(), index_length)); if (actual_size != index_length) { @@ -82,7 +85,10 @@ Result> BlobFileBatchReader::Create( int64_t offset = 0; for (const auto& blob_length : blob_lengths) { blob_offsets.push_back(offset); - offset += blob_length; + // Null blobs (bin_length == -1) don't occupy file space + if (blob_length >= 0) { + offset += blob_length; + } } PAIMON_ASSIGN_OR_RAISE(std::string file_path, input_stream->GetUri()); auto reader = std::unique_ptr(new BlobFileBatchReader( @@ -165,7 +171,10 @@ Result> BlobFileBatchReader::NextBlobOffsets( int64_t data_length = 0; for (int32_t k = 0; k < rows_to_read; ++k) { const size_t i = current_pos_ + k; - data_length += GetTargetContentLength(i); + // Null blobs contribute zero bytes to content + if (!IsTargetNull(i)) { + data_length += GetTargetContentLength(i); + } PAIMON_RETURN_NOT_OK_FROM_ARROW(buffer_builder.Append(data_length)); } PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr offset_buffer, @@ -178,13 +187,18 @@ Result> BlobFileBatchReader::NextBlobContents( int64_t total_length = 0; for (int32_t k = 0; k < rows_to_read; ++k) { const size_t i = current_pos_ + k; - total_length += GetTargetContentLength(i); + if (!IsTargetNull(i)) { + total_length += GetTargetContentLength(i); + } } PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr data_buffer, arrow::AllocateBuffer(total_length, arrow_pool_.get())); uint8_t* buffer = data_buffer->mutable_data(); for (int32_t k = 0; k < rows_to_read; ++k) { const size_t i = current_pos_ + k; + if (IsTargetNull(i)) { + continue; + } int64_t offset = GetTargetContentOffset(i); int64_t length = GetTargetContentLength(i); PAIMON_RETURN_NOT_OK(ReadBlobContentAt(offset, length, buffer)); @@ -193,13 +207,40 @@ Result> BlobFileBatchReader::NextBlobContents( return data_buffer; } +Result> BlobFileBatchReader::BuildNullBitmap( + int32_t rows_to_read) const { + bool has_null = false; + for (int32_t k = 0; k < rows_to_read; ++k) { + if (IsTargetNull(current_pos_ + k)) { + has_null = true; + break; + } + } + if (!has_null) { + return std::shared_ptr(); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr null_bitmap, + arrow::AllocateBitmap(rows_to_read, arrow_pool_.get())); + // Initialize all bits to 1 (valid), then clear bits for null rows + memset(null_bitmap->mutable_data(), 0xFF, null_bitmap->size()); + for (int32_t k = 0; k < rows_to_read; ++k) { + if (IsTargetNull(current_pos_ + k)) { + arrow::bit_util::ClearBit(null_bitmap->mutable_data(), k); + } + } + return null_bitmap; +} + Result> BlobFileBatchReader::BuildContentArray( int32_t rows_to_read) const { PAIMON_ASSIGN_OR_RAISE(std::shared_ptr value_offsets, NextBlobOffsets(rows_to_read)); PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data, NextBlobContents(rows_to_read)); - auto large_binary_array = - std::make_shared(rows_to_read, value_offsets, data); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr child_null_bitmap, + BuildNullBitmap(rows_to_read)); + + auto large_binary_array = std::make_shared(rows_to_read, value_offsets, + data, child_null_bitmap); std::vector> child_data; child_data.emplace_back(large_binary_array->data()); std::shared_ptr struct_array_data = @@ -213,23 +254,44 @@ Result> BlobFileBatchReader::BuildTargetArray( if (!blob_as_descriptor_) { return BuildContentArray(rows_to_read); } - std::vector> blobs; - blobs.reserve(rows_to_read); + // For descriptor mode, build using StructBuilder to handle nulls properly + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr array_builder, + arrow::MakeBuilder(target_type_, arrow_pool_.get())); + auto builder = dynamic_cast(array_builder.get()); + if (builder == nullptr) { + return Status::Invalid("cast to struct builder failed"); + } + auto field_builder = dynamic_cast(builder->field_builder(0)); + if (field_builder == nullptr) { + return Status::Invalid("cast to large binary builder failed"); + } for (int32_t k = 0; k < rows_to_read; ++k) { const size_t i = current_pos_ + k; - int64_t offset = GetTargetContentOffset(i); - int64_t length = GetTargetContentLength(i); - PAIMON_ASSIGN_OR_RAISE(std::unique_ptr blob, - Blob::FromPath(file_path_, offset, length)); - blobs.emplace_back(blob->ToDescriptor(pool_)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append()); + if (IsTargetNull(i)) { + PAIMON_RETURN_NOT_OK_FROM_ARROW(field_builder->AppendNull()); + } else { + int64_t offset = GetTargetContentOffset(i); + int64_t length = GetTargetContentLength(i); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr blob, + Blob::FromPath(file_path_, offset, length)); + auto descriptor = blob->ToDescriptor(pool_); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + field_builder->Append(descriptor->data(), descriptor->size())); + } } - return ToArrowArray(blobs); + std::shared_ptr array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Finish(&array)); + return array; } Result BlobFileBatchReader::NextBatch() { if (closed_) { return Status::Invalid("blob file batch reader is closed"); } + if (target_type_ == nullptr) { + return Status::Invalid("target type is nullptr, call SetReadSchema first"); + } if (current_pos_ >= target_blob_lengths_.size()) { PAIMON_ASSIGN_OR_RAISE(previous_batch_first_row_number_, GetNumberOfRows()); return BatchReader::MakeEofBatch(); @@ -254,30 +316,6 @@ Status BlobFileBatchReader::ReadBlobContentAt(const int64_t offset, const int64_ reinterpret_cast(content)); } -Result> BlobFileBatchReader::ToArrowArray( - const std::vector>& blobs) const { - if (target_type_ == nullptr) { - return Status::Invalid("target type is nullptr, call SetReadSchema first"); - } - PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr array_builder, - arrow::MakeBuilder(target_type_, arrow_pool_.get())); - auto builder = dynamic_cast(array_builder.get()); - if (builder == nullptr) { - return Status::Invalid("cast to struct builder failed"); - } - auto field_builder = dynamic_cast(builder->field_builder(0)); - if (field_builder == nullptr) { - return Status::Invalid("cast to large binary builder failed"); - } - for (const auto& blob : blobs) { - PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append()); - PAIMON_RETURN_NOT_OK_FROM_ARROW(field_builder->Append(blob->data(), blob->size())); - } - std::shared_ptr array; - PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Finish(&array)); - return array; -} - int32_t BlobFileBatchReader::GetIndexLength(const int8_t* bytes, int32_t offset) { return (bytes[offset + 3] << 24) | ((bytes[offset + 2] & 0xff) << 16) | ((bytes[offset + 1] & 0xff) << 8) | (bytes[offset] & 0xff); diff --git a/src/paimon/format/blob/blob_file_batch_reader.h b/src/paimon/format/blob/blob_file_batch_reader.h index d71b8425a..06287d759 100644 --- a/src/paimon/format/blob/blob_file_batch_reader.h +++ b/src/paimon/format/blob/blob_file_batch_reader.h @@ -24,6 +24,7 @@ #include "arrow/memory_pool.h" #include "arrow/type.h" +#include "paimon/common/data/blob_defs.h" #include "paimon/fs/file_system.h" #include "paimon/memory/bytes.h" #include "paimon/predicate/predicate.h" @@ -85,8 +86,6 @@ namespace paimon::blob { /// - Current version is 1. class BlobFileBatchReader : public FileBatchReader { public: - static constexpr uint32_t kBlobFileHeaderLength = 5; - static Result> Create( const std::shared_ptr& input_stream, int32_t batch_size, bool blob_as_descriptor, const std::shared_ptr& pool); @@ -125,8 +124,6 @@ class BlobFileBatchReader : public FileBatchReader { } private: - static constexpr int32_t kBlobContentStartOffset = 4; - static constexpr int32_t kBlobTotalMetaLength = 16; static constexpr uint64_t kDefaultReadChunkSize = 1024 * 1024; static int32_t GetIndexLength(const int8_t* bytes, int32_t offset); @@ -136,22 +133,26 @@ class BlobFileBatchReader : public FileBatchReader { const std::vector& blob_offsets, int32_t batch_size, bool blob_as_descriptor, const std::shared_ptr& pool); - Result> ToArrowArray( - const std::vector>& blobs) const; - Status ReadBlobContentAt(const int64_t offset, const int64_t length, uint8_t* content) const; Result> NextBlobOffsets(int32_t rows_to_read) const; Result> NextBlobContents(int32_t rows_to_read) const; + /// Builds a null bitmap buffer for the given rows. Returns nullptr if no nulls. + Result> BuildNullBitmap(int32_t rows_to_read) const; Result> BuildContentArray(int32_t rows_to_read) const; Result> BuildTargetArray(int32_t rows_to_read) const; + /// Returns true if the blob at the given index is null (bin_length == kNullBinLength). + bool IsTargetNull(size_t index) const { + return target_blob_lengths_[index] == BlobDefs::kNullBinLength; + } + int64_t GetTargetContentOffset(size_t index) const { - return target_blob_offsets_[index] + kBlobContentStartOffset; + return target_blob_offsets_[index] + BlobDefs::kContentStartOffset; } int64_t GetTargetContentLength(size_t index) const { - return target_blob_lengths_[index] - kBlobTotalMetaLength; + return target_blob_lengths_[index] - BlobDefs::kTotalMetaLength; } std::shared_ptr input_stream_; diff --git a/src/paimon/format/blob/blob_format_writer.cpp b/src/paimon/format/blob/blob_format_writer.cpp index d2cdc9fac..f41e21e90 100644 --- a/src/paimon/format/blob/blob_format_writer.cpp +++ b/src/paimon/format/blob/blob_format_writer.cpp @@ -20,6 +20,7 @@ #include "arrow/api.h" #include "arrow/c/bridge.h" +#include "paimon/common/data/blob_defs.h" #include "paimon/common/data/blob_utils.h" #include "paimon/common/memory/memory_segment_utils.h" #include "paimon/common/metrics/metrics_impl.h" @@ -41,7 +42,7 @@ BlobFormatWriter::BlobFormatWriter(bool blob_as_descriptor, fs_(fs), pool_(pool) { metrics_ = std::make_shared(); - tmp_buffer_ = Bytes::AllocateBytes(TMP_BUFFER_SIZE, pool_.get()); + tmp_buffer_ = Bytes::AllocateBytes(kTmpBufferSize, pool_.get()); } Result> BlobFormatWriter::Create( @@ -82,9 +83,17 @@ Status BlobFormatWriter::AddBatch(ArrowArray* batch) { assert(arrow_array->num_fields() == 1); auto struct_array = arrow::internal::checked_pointer_cast(arrow_array); auto child_array = struct_array->field(0); - if (arrow_array->null_count() != 0 || child_array->null_count() != 0) { - return Status::Invalid("BlobFormatWriter only support non-null blob."); + + // Struct-level null is not supported (caller should not pass null struct rows) + if (struct_array->IsNull(0)) { + return Status::Invalid("BlobFormatWriter does not support struct-level null."); + } + // Child-level null: record kNullBinLength, skip data writing (aligned with Java) + if (child_array->IsNull(0)) { + bin_lengths_.push_back(BlobDefs::kNullBinLength); + return Status::OK(); } + if (child_array->type_id() != arrow::Type::type::LARGE_BINARY) { return Status::Invalid("BlobFormatWriter only support large binary type."); } @@ -110,7 +119,8 @@ Status BlobFormatWriter::Finish() { PAIMON_UNIQUE_PTR index_length_bytes = IntegerToLittleEndian(static_cast(index_bytes.size()), pool_); PAIMON_RETURN_NOT_OK(WriteBytes(index_length_bytes->data(), index_length_bytes->size())); - PAIMON_RETURN_NOT_OK(WriteBytes(reinterpret_cast(&VERSION), sizeof(VERSION))); + PAIMON_RETURN_NOT_OK(WriteBytes(reinterpret_cast(&BlobDefs::kFileVersion), + sizeof(BlobDefs::kFileVersion))); PAIMON_RETURN_NOT_OK(Flush()); @@ -123,9 +133,9 @@ Status BlobFormatWriter::WriteBlob(std::string_view blob_data) { PAIMON_ASSIGN_OR_RAISE(int64_t previous_pos, out_->GetPos()); // write magic number - static PAIMON_UNIQUE_PTR MAGIC_NUMBER_BYTES = - IntegerToLittleEndian(MAGIC_NUMBER, pool_); - PAIMON_RETURN_NOT_OK(WriteWithCrc32(MAGIC_NUMBER_BYTES->data(), MAGIC_NUMBER_BYTES->size())); + static PAIMON_UNIQUE_PTR kMagicNumberBytes = + IntegerToLittleEndian(BlobDefs::kMagicNumber, pool_); + PAIMON_RETURN_NOT_OK(WriteWithCrc32(kMagicNumberBytes->data(), kMagicNumberBytes->size())); // write blob content std::unique_ptr in; diff --git a/src/paimon/format/blob/blob_format_writer.h b/src/paimon/format/blob/blob_format_writer.h index 82bab5ddb..c52437e8d 100644 --- a/src/paimon/format/blob/blob_format_writer.h +++ b/src/paimon/format/blob/blob_format_writer.h @@ -79,10 +79,8 @@ class BlobFormatWriter : public FormatWriter { static PAIMON_UNIQUE_PTR IntegerToLittleEndian(T value, const std::shared_ptr& pool); - private: - static constexpr int8_t VERSION = 1; - static constexpr int32_t MAGIC_NUMBER = 1481511375; - static constexpr uint32_t TMP_BUFFER_SIZE = 1024 * 1024; + public: + static constexpr uint32_t kTmpBufferSize = 1024 * 1024; private: bool blob_as_descriptor_; diff --git a/src/paimon/format/blob/blob_format_writer_test.cpp b/src/paimon/format/blob/blob_format_writer_test.cpp index 65ef67977..3f3779108 100644 --- a/src/paimon/format/blob/blob_format_writer_test.cpp +++ b/src/paimon/format/blob/blob_format_writer_test.cpp @@ -294,7 +294,7 @@ TEST_P(BlobFormatWriterTest, TestLargeBlob) { file_system_->Create(large_file_path, /*overwrite=*/true)); // Write data larger than TMP_BUFFER_SIZE (1MB) - const size_t large_size = BlobFormatWriter::TMP_BUFFER_SIZE * 2 + 1000; // ~2MB + const size_t large_size = BlobFormatWriter::kTmpBufferSize * 2 + 1000; // ~2MB std::vector large_data(large_size, 'A'); ASSERT_OK_AND_ASSIGN(int32_t written, large_file_stream->Write(large_data.data(), large_size)); ASSERT_EQ(written, large_size); @@ -343,35 +343,56 @@ TEST_P(BlobFormatWriterTest, TestAddBatchWithNullValues) { BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, struct_type_, file_system_, pool_)); - // Test with null struct array + // Write one row with child-level null blob arrow::StructBuilder struct_builder(struct_type_, arrow::default_memory_pool(), - {std::make_shared()}); - ASSERT_TRUE(struct_builder.AppendNull().ok()); // Append null instead of valid value + {std::make_shared()}); + auto blob_builder = static_cast(struct_builder.field_builder(0)); + ASSERT_TRUE(struct_builder.Append().ok()); + ASSERT_TRUE(blob_builder->AppendNull().ok()); + std::shared_ptr null_child_array; + ASSERT_TRUE(struct_builder.Finish(&null_child_array).ok()); + auto c_array = std::make_unique(); + ASSERT_TRUE(arrow::ExportArray(*null_child_array, c_array.get()).ok()); + ASSERT_OK(writer->AddBatch(c_array.get())); + ASSERT_OK(writer->Flush()); + ASSERT_OK(writer->Finish()); + + // Read back and verify + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, + file_system_->Open(dir_->Str() + "/file.blob")); + ASSERT_TRUE(input_stream); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr reader, + BlobFileBatchReader::Create(input_stream, /*batch_size=*/1024, blob_as_descriptor_, pool_)); + auto schema = arrow::schema(struct_type_->fields()); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + ASSERT_OK( + reader->SetReadSchema(&c_schema, /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt)); + ASSERT_OK_AND_ASSIGN(auto chunked_array, + paimon::test::ReadResultCollector::CollectResult(reader.get())); + + auto concat_array = arrow::Concatenate(chunked_array->chunks()).ValueOrDie(); + auto result_struct = arrow::internal::checked_pointer_cast(concat_array); + ASSERT_TRUE(result_struct); + ASSERT_EQ(result_struct->length(), 1); + ASSERT_TRUE(result_struct->field(0)->IsNull(0)); + + // Struct-level null should still be rejected + arrow::StructBuilder struct_builder2(struct_type_, arrow::default_memory_pool(), + {std::make_shared()}); + ASSERT_TRUE(struct_builder2.AppendNull().ok()); std::shared_ptr null_struct_array; - ASSERT_TRUE(struct_builder.Finish(&null_struct_array).ok()); + ASSERT_TRUE(struct_builder2.Finish(&null_struct_array).ok()); auto null_c_array = std::make_unique(); ASSERT_TRUE(arrow::ExportArray(*null_struct_array, null_c_array.get()).ok()); - - ASSERT_NOK_WITH_MSG(writer->AddBatch(null_c_array.get()), - "BlobFormatWriter only support non-null blob."); + ASSERT_OK_AND_ASSIGN(std::shared_ptr writer2, + BlobFormatWriter::Create(blob_as_descriptor_, output_stream_, struct_type_, + file_system_, pool_)); + ASSERT_NOK_WITH_MSG(writer2->AddBatch(null_c_array.get()), + "BlobFormatWriter does not support struct-level null."); ArrowArrayRelease(null_c_array.get()); - - // Test with null child array (blob field is null) - arrow::StructBuilder struct_builder2(struct_type_, arrow::default_memory_pool(), - {std::make_shared()}); - auto blob_builder = static_cast(struct_builder2.field_builder(0)); - ASSERT_TRUE(struct_builder2.Append().ok()); - ASSERT_TRUE(blob_builder->AppendNull().ok()); // Append null blob - - std::shared_ptr null_child_array; - ASSERT_TRUE(struct_builder2.Finish(&null_child_array).ok()); - auto null_child_c_array = std::make_unique(); - ASSERT_TRUE(arrow::ExportArray(*null_child_array, null_child_c_array.get()).ok()); - - ASSERT_NOK_WITH_MSG(writer->AddBatch(null_child_c_array.get()), - "BlobFormatWriter only support non-null blob."); - ArrowArrayRelease(null_child_c_array.get()); } TEST_P(BlobFormatWriterTest, TestAddBatchWithZeroLengthBlob) { diff --git a/test/inte/blob_table_inte_test.cpp b/test/inte/blob_table_inte_test.cpp index 07063ce08..f1d702ae3 100644 --- a/test/inte/blob_table_inte_test.cpp +++ b/test/inte/blob_table_inte_test.cpp @@ -1396,4 +1396,94 @@ TEST_P(BlobTableInteTest, TestWithRowIdsForMultipleBlobFiles) { } } +TEST_P(BlobTableInteTest, TestAppendTableWriteWithMultipleBlobFields) { + auto dir = UniqueTestDirectory::Create(); + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + BlobUtils::ToArrowField("blob1", true), BlobUtils::ToArrowField("blob2", true)}; + auto schema = arrow::schema(fields); + + auto file_format = GetParam(); + std::map options = { + {Options::MANIFEST_FORMAT, "orc"}, {Options::FILE_FORMAT, file_format}, + {Options::TARGET_FILE_SIZE, "700"}, {Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_AS_DESCRIPTOR, "false"}, {Options::FILE_SYSTEM, "local"}}; + + ASSERT_OK_AND_ASSIGN( + auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{}, + /*primary_keys=*/{}, options, /*is_streaming_mode=*/true)); + int64_t commit_identifier = 0; + + std::string data = R"([ + ["str_0", null, "apple", "red"], + ["str_1", 1, "banana", "yellow"], + ["str_2", 2, "cat", "black"] + ])"; + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch, + TestHelper::MakeRecordBatch(arrow::struct_(fields), data, + /*partition_map=*/{}, /*bucket=*/0, {})); + + ASSERT_OK_AND_ASSIGN(auto commit_msgs, + helper->WriteAndCommit(std::move(batch), commit_identifier++, + /*expected_commit_messages=*/std::nullopt)); + ASSERT_EQ(commit_msgs.size(), 1); + + ASSERT_OK_AND_ASSIGN(std::optional snapshot, helper->LatestSnapshot()); + ASSERT_TRUE(snapshot); + ASSERT_EQ(1, snapshot.value().Id()); + ASSERT_EQ(3, snapshot.value().NextRowId().value()); + + // TODO(xinyu.lxy): add scan and read verification for multiple blob fields +} + +TEST_P(BlobTableInteTest, TestAppendWriteWithNullBlob) { + auto dir = UniqueTestDirectory::Create(); + arrow::FieldVector fields = {arrow::field("f0", arrow::int32()), + BlobUtils::ToArrowField("blob", true)}; + auto schema = arrow::schema(fields); + + auto file_format = GetParam(); + std::map options = {{Options::MANIFEST_FORMAT, "orc"}, + {Options::FILE_FORMAT, file_format}, + {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, "local"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_AS_DESCRIPTOR, "false"}}; + + ASSERT_OK_AND_ASSIGN( + auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{}, + /*primary_keys=*/{}, options, /*is_streaming_mode=*/true)); + + // Write: row 0 non-null blob, row 1 null blob, row 2 non-null blob + std::string data = R"([ + [1, "hello"], + [2, null], + [3, "world"] + ])"; + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch, + TestHelper::MakeRecordBatch(arrow::struct_(fields), data, + /*partition_map=*/{}, /*bucket=*/0, {})); + ASSERT_OK_AND_ASSIGN(auto commit_msgs, + helper->WriteAndCommit(std::move(batch), /*commit_identifier=*/0, + /*expected_commit_messages=*/std::nullopt)); + + // Read and verify + arrow::FieldVector fields_with_row_kind = fields; + fields_with_row_kind.insert(fields_with_row_kind.begin(), + arrow::field("_VALUE_KIND", arrow::int8())); + auto data_type = arrow::struct_(fields_with_row_kind); + ASSERT_OK_AND_ASSIGN(std::vector> data_splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + std::string expected_data = R"([ + [0, 1, "hello"], + [0, 2, null], + [0, 3, "world"] + ])"; + ASSERT_OK_AND_ASSIGN(bool success, + helper->ReadAndCheckResult(data_type, data_splits, expected_data)); + ASSERT_TRUE(success); +} + } // namespace paimon::test diff --git a/test/inte/write_inte_test.cpp b/test/inte/write_inte_test.cpp index 5379e25ff..a5d1ea456 100644 --- a/test/inte/write_inte_test.cpp +++ b/test/inte/write_inte_test.cpp @@ -231,6 +231,52 @@ class WriteInteTest : public testing::Test, public ::testing::WithParamInterface return new_meta; } + /// Build a StructArray with schema (f0:string, f1:int32, blob_field_1, blob_field_2, ...). + /// The int (f1) column will be null at rows where i % 3 == 0. + /// @param fields The full field vector of the schema. + /// @param blob_descriptors_per_field One vector of Bytes per blob field, all same length. + std::shared_ptr GenerateBlobArray( + const arrow::FieldVector& fields, + const std::vector>>& blob_descriptors_per_field) + const { + size_t num_blob_fields = blob_descriptors_per_field.size(); + EXPECT_GE(fields.size(), 2 + num_blob_fields); + size_t row_count = num_blob_fields > 0 ? blob_descriptors_per_field[0].size() : 0; + for (const auto& descriptors : blob_descriptors_per_field) { + EXPECT_EQ(descriptors.size(), row_count); + } + + std::vector> child_builders; + child_builders.push_back(std::make_shared()); + child_builders.push_back(std::make_shared()); + for (size_t b = 0; b < num_blob_fields; ++b) { + child_builders.push_back(std::make_shared()); + } + arrow::StructBuilder struct_builder(arrow::struct_(fields), arrow::default_memory_pool(), + child_builders); + auto string_builder = dynamic_cast(struct_builder.field_builder(0)); + auto int_builder = dynamic_cast(struct_builder.field_builder(1)); + + for (size_t i = 0; i < row_count; ++i) { + EXPECT_TRUE(struct_builder.Append().ok()); + EXPECT_TRUE(string_builder->Append("str_" + std::to_string(i)).ok()); + if (i % 3 == 0) { + EXPECT_TRUE(int_builder->AppendNull().ok()); + } else { + EXPECT_TRUE(int_builder->Append(static_cast(i)).ok()); + } + for (size_t b = 0; b < num_blob_fields; ++b) { + auto blob_builder = dynamic_cast( + struct_builder.field_builder(2 + static_cast(b))); + const auto& desc = blob_descriptors_per_field[b][i]; + EXPECT_TRUE(blob_builder->Append(desc->data(), desc->size()).ok()); + } + } + std::shared_ptr array; + EXPECT_TRUE(struct_builder.Finish(&array).ok()); + return array; + } + void CheckCreationTime(const std::vector>& commit_messages) { TimezoneGuard guard("Asia/Shanghai"); for (const auto& msg : commit_messages) { @@ -3722,34 +3768,6 @@ TEST_P(WriteInteTest, TestAppendTableWriteWithBlobType) { /*primary_keys=*/{}, options, /*is_streaming_mode=*/true)); int64_t commit_identifier = 0; - auto generate_blob_array = [&](const std::vector>& blob_descriptors) - -> std::shared_ptr { - arrow::StructBuilder struct_builder( - arrow::struct_(fields), arrow::default_memory_pool(), - {std::make_shared(), std::make_shared(), - std::make_shared()}); - auto string_builder = dynamic_cast(struct_builder.field_builder(0)); - auto int_builder = dynamic_cast(struct_builder.field_builder(1)); - auto binary_builder = - dynamic_cast(struct_builder.field_builder(2)); - for (size_t i = 0; i < blob_descriptors.size(); ++i) { - EXPECT_TRUE(struct_builder.Append().ok()); - EXPECT_TRUE(string_builder->Append("str_" + std::to_string(i)).ok()); - if (i % 3 == 0) { - // test null - EXPECT_TRUE(int_builder->AppendNull().ok()); - } else { - EXPECT_TRUE(int_builder->Append(i).ok()); - } - EXPECT_TRUE( - binary_builder->Append(blob_descriptors[i]->data(), blob_descriptors[i]->size()) - .ok()); - } - std::shared_ptr array; - EXPECT_TRUE(struct_builder.Finish(&array).ok()); - return array; - }; - std::vector> blob_descriptors; std::string file1 = paimon::test::GetDataDir() + "/avro/data/avro_with_null"; ASSERT_OK_AND_ASSIGN(auto blob1, Blob::FromPath(file1)); @@ -3763,7 +3781,9 @@ TEST_P(WriteInteTest, TestAppendTableWriteWithBlobType) { ASSERT_OK_AND_ASSIGN(auto blob4, Blob::FromPath(file2, /*offset=*/300, /*length=*/3000)); blob_descriptors.emplace_back(blob4->ToDescriptor(pool_)); - auto array = generate_blob_array(blob_descriptors); + std::vector>> blob_fields; + blob_fields.emplace_back(std::move(blob_descriptors)); + auto array = GenerateBlobArray(fields, blob_fields); ::ArrowArray arrow_array; ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); RecordBatchBuilder batch_builder(&arrow_array); @@ -4537,5 +4557,432 @@ TEST_P(WriteInteTest, TestPkSpillableWithIOException) { } ASSERT_TRUE(run_complete); } +TEST_P(WriteInteTest, TestAppendTableWriteWithMultipleBlobFields) { + auto dir = UniqueTestDirectory::Create(); + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + BlobUtils::ToArrowField("blob1", false), BlobUtils::ToArrowField("blob2", false)}; + auto schema = arrow::schema(fields); + + auto file_format = GetParam(); + std::map options = {{Options::MANIFEST_FORMAT, "orc"}, + {Options::FILE_FORMAT, file_format}, + {Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::FILE_SYSTEM, "local"}, + {Options::BLOB_AS_DESCRIPTOR, "true"}, + {Options::BLOB_FIELD, "blob2,blob1"}}; + + ASSERT_OK_AND_ASSIGN( + auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{}, + /*primary_keys=*/{}, options, /*is_streaming_mode=*/true)); + int64_t commit_identifier = 0; + + // Prepare blob descriptors for both blob fields + std::vector> blob1_descriptors; + std::vector> blob2_descriptors; + + std::string file1 = paimon::test::GetDataDir() + "/avro/data/avro_with_null"; + ASSERT_OK_AND_ASSIGN(auto blob1_a, Blob::FromPath(file1)); + blob1_descriptors.emplace_back(blob1_a->ToDescriptor(pool_)); + + std::string file2 = paimon::test::GetDataDir() + "/xxhash.data"; + ASSERT_OK_AND_ASSIGN(auto blob1_b, Blob::FromPath(file2, /*offset=*/0, /*length=*/91)); + blob1_descriptors.emplace_back(blob1_b->ToDescriptor(pool_)); + ASSERT_OK_AND_ASSIGN(auto blob1_c, Blob::FromPath(file2, /*offset=*/92, /*length=*/85)); + blob1_descriptors.emplace_back(blob1_c->ToDescriptor(pool_)); + + // blob2 field uses different data slices + ASSERT_OK_AND_ASSIGN(auto blob2_a, Blob::FromPath(file2, /*offset=*/300, /*length=*/3000)); + blob2_descriptors.emplace_back(blob2_a->ToDescriptor(pool_)); + ASSERT_OK_AND_ASSIGN(auto blob2_b, Blob::FromPath(file2, /*offset=*/0, /*length=*/91)); + blob2_descriptors.emplace_back(blob2_b->ToDescriptor(pool_)); + ASSERT_OK_AND_ASSIGN(auto blob2_c, Blob::FromPath(file1)); + blob2_descriptors.emplace_back(blob2_c->ToDescriptor(pool_)); + + std::vector>> blob_fields; + blob_fields.emplace_back(std::move(blob1_descriptors)); + blob_fields.emplace_back(std::move(blob2_descriptors)); + auto array = GenerateBlobArray(fields, blob_fields); + ::ArrowArray arrow_array; + ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); + RecordBatchBuilder batch_builder(&arrow_array); + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch, batch_builder.Finish()); + + // Build expected DataFileMeta for verification + // main file: 3 rows, write_cols={"f0","f1"}, first_row_id=0 + auto expected_main = std::make_shared( + "data-xxx.xxx", /*file_size=*/0, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("str_0"), 1}, {std::string("str_2"), 2}, + std::vector({0, 1}), pool_.get()), + /*min_sequence_number=*/1, /*max_sequence_number=*/1, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, /*first_row_id=*/0, + /*write_cols=*/std::vector({"f0", "f1"})); + expected_main = ReconstructDataFileMeta(expected_main); + + // blob1 file: 3 rows, write_cols={"blob1"}, first_row_id=0 + auto expected_blob1 = std::make_shared( + "data-xxx.blob", /*file_size=*/0, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({NullType()}, {NullType()}, std::vector({0}), + pool_.get()), + /*min_sequence_number=*/1, /*max_sequence_number=*/1, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, /*first_row_id=*/0, + /*write_cols=*/std::vector({"blob1"})); + + // blob2 file: 3 rows, write_cols={"blob2"}, first_row_id=0 + auto expected_blob2 = std::make_shared( + "data-xxx.blob", /*file_size=*/0, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({NullType()}, {NullType()}, std::vector({0}), + pool_.get()), + /*min_sequence_number=*/1, /*max_sequence_number=*/1, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(0, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, /*first_row_id=*/0, + /*write_cols=*/std::vector({"blob2"})); + + ASSERT_OK_AND_ASSIGN(auto commit_msgs, + helper->WriteAndCommit(std::move(batch), commit_identifier++, + /*expected_commit_messages=*/std::nullopt)); + ASSERT_EQ(commit_msgs.size(), 1); + + ASSERT_OK_AND_ASSIGN(std::optional snapshot, helper->LatestSnapshot()); + ASSERT_TRUE(snapshot); + ASSERT_EQ(1, snapshot.value().Id()); + // 3 rows * 3 files (1 main + 1 blob1 + 1 blob2) = 9 total records + ASSERT_EQ(9, snapshot.value().TotalRecordCount().value()); + ASSERT_EQ(9, snapshot.value().DeltaRecordCount().value()); + ASSERT_EQ(3, snapshot.value().NextRowId().value()); + + // Check data file meta after commit + ASSERT_OK_AND_ASSIGN(std::vector> data_splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_EQ(data_splits.size(), 1); + auto data_split = std::dynamic_pointer_cast(data_splits[0]); + ASSERT_EQ(data_split->DataFiles().size(), 3); + + // Verify each file meta by matching write_cols + for (const auto& actual_file : data_split->DataFiles()) { + if (!BlobUtils::IsBlobFile(actual_file->file_name)) { + ASSERT_TRUE(actual_file->TEST_Equal(*expected_main)); + } else { + ASSERT_TRUE(actual_file->write_cols.has_value()); + if (actual_file->write_cols->at(0) == "blob1") { + ASSERT_TRUE(actual_file->TEST_Equal(*expected_blob1)); + } else if (actual_file->write_cols->at(0) == "blob2") { + ASSERT_TRUE(actual_file->TEST_Equal(*expected_blob2)); + } else { + FAIL() << "Unexpected blob field: " << actual_file->write_cols->at(0); + } + } + } +} + +TEST_P(WriteInteTest, TestRowTrackingPartitionGroupOnCommit) { + auto dir = UniqueTestDirectory::Create(); + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + BlobUtils::ToArrowField("blob1", true), BlobUtils::ToArrowField("blob2", true)}; + auto schema = arrow::schema(fields); + + auto file_format = GetParam(); + std::map options = { + {Options::MANIFEST_FORMAT, "orc"}, + {Options::FILE_FORMAT, file_format}, + {Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::FILE_SYSTEM, "local"}, + {Options::BLOB_AS_DESCRIPTOR, "false"}}; + + std::vector partition_keys = {"f0"}; + ASSERT_OK_AND_ASSIGN(auto helper, TestHelper::Create(dir->Str(), schema, partition_keys, + /*primary_keys=*/{}, options, + /*is_streaming_mode=*/true)); + + // Write + PrepareCommit in interleaved order: pt1(2 rows) -> pt2(2 rows) -> pt1(1 row) + // Each write+prepareCommit produces separate commit messages. + // Without partition group, commit would assign row ids in message order: pt1, pt2, pt1 + // causing pt1's row ids to be non-contiguous (0,1 gap 4). + // With partition group, commit regroups by partition first, so pt1 gets 0,1,2 and pt2 gets + // 3,4. + auto write_and_prepare = [&](const std::string& data, + const std::map& partition_map, + int64_t identifier) { + EXPECT_OK_AND_ASSIGN( + auto batch, TestHelper::MakeRecordBatch(arrow::struct_(fields), data, partition_map, + /*bucket=*/0, {})); + + EXPECT_OK(helper->write_->Write(std::move(batch))); + EXPECT_OK_AND_ASSIGN(auto messages, helper->write_->PrepareCommit( + /*wait_compaction=*/false, identifier)); + return messages; + }; + + int64_t commit_identifier = 0; + auto msgs_pt1_1 = + write_and_prepare(R"([["pt1", 1, "apple", "red"], ["pt1", 2, "banana", "yellow"]])", + {{"f0", "pt1"}}, commit_identifier); + auto msgs_pt2 = + write_and_prepare(R"([["pt2", 10, "cat", "black"], ["pt2", 20, "dog", "white"]])", + {{"f0", "pt2"}}, commit_identifier); + auto msgs_pt1_2 = + write_and_prepare(R"([["pt1", 3, "eagle", "brown"]])", {{"f0", "pt1"}}, commit_identifier); + + // Merge all commit messages in interleaved order and commit once + std::vector> all_msgs; + all_msgs.insert(all_msgs.end(), msgs_pt1_1.begin(), msgs_pt1_1.end()); + all_msgs.insert(all_msgs.end(), msgs_pt2.begin(), msgs_pt2.end()); + all_msgs.insert(all_msgs.end(), msgs_pt1_2.begin(), msgs_pt1_2.end()); + ASSERT_OK(helper->commit_->Commit(all_msgs, commit_identifier++)); + + ASSERT_OK_AND_ASSIGN(std::optional snapshot, helper->LatestSnapshot()); + ASSERT_TRUE(snapshot); + ASSERT_EQ(5, snapshot.value().NextRowId().value()); + + ASSERT_OK_AND_ASSIGN(std::vector> data_splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_EQ(data_splits.size(), 2); + + // Helper to verify a file's first_row_id and row_count + auto check_file = [](const std::shared_ptr& file, int64_t expected_row_id, + int64_t expected_row_count) { + ASSERT_EQ(expected_row_id, file->first_row_id.value()); + ASSERT_EQ(expected_row_count, file->row_count); + }; + + // Partition group order is non-deterministic (unordered_map), so we cannot + // assume which partition gets row id 0. Instead, verify: + // 1. Within each partition, main files have contiguous row ids + // 2. Blob files' first_row_id matches the corresponding main file + // 3. The two partitions' row id ranges do not overlap + int64_t pt1_start = -1, pt2_start = -1; + for (const auto& split : data_splits) { + auto data_split = std::dynamic_pointer_cast(split); + auto partition_value = data_split->Partition().GetStringView(0); + const auto& files = data_split->DataFiles(); + + if (partition_value == "pt1") { + // pt1: batch1(2 rows) + batch2(1 row) = 3 rows + // files: main1, blob1, blob2, main2, blob1, blob2 + ASSERT_EQ(6, files.size()); + pt1_start = files[0]->first_row_id.value(); + // main1: row_count=2 + check_file(files[0], pt1_start, 2); + check_file(files[1], pt1_start, 2); // blob1 for main1 + check_file(files[2], pt1_start, 2); // blob2 for main1 + // main2: contiguous after main1 + check_file(files[3], pt1_start + 2, 1); // main2 + check_file(files[4], pt1_start + 2, 1); // blob1 for main2 + check_file(files[5], pt1_start + 2, 1); // blob2 for main2 + } else { + ASSERT_EQ("pt2", partition_value); + // pt2: 2 rows + ASSERT_EQ(3, files.size()); + pt2_start = files[0]->first_row_id.value(); + check_file(files[0], pt2_start, 2); // main + check_file(files[1], pt2_start, 2); // blob1 + check_file(files[2], pt2_start, 2); // blob2 + } + } + // Two partitions' row id ranges must not overlap + // pt1 occupies [pt1_start, pt1_start+3), pt2 occupies [pt2_start, pt2_start+2) + ASSERT_TRUE(pt1_start + 3 <= pt2_start || pt2_start + 2 <= pt1_start) + << "pt1 range [" << pt1_start << "," << pt1_start + 3 << ") and pt2 range [" << pt2_start + << "," << pt2_start + 2 << ") overlap"; +} + +TEST_P(WriteInteTest, TestRowTrackingPartitionGroupOnCommitDisabled) { + auto dir = UniqueTestDirectory::Create(); + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + BlobUtils::ToArrowField("blob1", true), BlobUtils::ToArrowField("blob2", true)}; + auto schema = arrow::schema(fields); + + auto file_format = GetParam(); + std::map options = { + {Options::MANIFEST_FORMAT, "orc"}, + {Options::FILE_FORMAT, file_format}, + {Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT, "false"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::FILE_SYSTEM, "local"}, + {Options::BLOB_AS_DESCRIPTOR, "false"}}; + + std::vector partition_keys = {"f0"}; + ASSERT_OK_AND_ASSIGN(auto helper, TestHelper::Create(dir->Str(), schema, partition_keys, + /*primary_keys=*/{}, options, + /*is_streaming_mode=*/true)); + + // Write + PrepareCommit in interleaved order: pt1(2 rows) -> pt2(2 rows) -> pt1(1 row) + // Without partition group, row ids are assigned in message order: + // pt1 batch1: row_id 0,1 pt2: row_id 2,3 pt1 batch2: row_id 4 + // So pt1's row ids are NOT contiguous: [0,1] and [4]. + auto write_and_prepare = [&](const std::string& data, + const std::map& partition_map, + int64_t identifier) { + EXPECT_OK_AND_ASSIGN( + auto batch, TestHelper::MakeRecordBatch(arrow::struct_(fields), data, partition_map, + /*bucket=*/0, {})); + EXPECT_OK(helper->write_->Write(std::move(batch))); + EXPECT_OK_AND_ASSIGN(auto messages, helper->write_->PrepareCommit( + /*wait_compaction=*/false, identifier)); + return messages; + }; + + int64_t commit_identifier = 0; + auto msgs_pt1_1 = + write_and_prepare(R"([["pt1", 1, "apple", "red"], ["pt1", 2, "banana", "yellow"]])", + {{"f0", "pt1"}}, commit_identifier); + auto msgs_pt2 = + write_and_prepare(R"([["pt2", 10, "cat", "black"], ["pt2", 20, "dog", "white"]])", + {{"f0", "pt2"}}, commit_identifier); + auto msgs_pt1_2 = + write_and_prepare(R"([["pt1", 3, "eagle", "brown"]])", {{"f0", "pt1"}}, commit_identifier); + + std::vector> all_msgs; + all_msgs.insert(all_msgs.end(), msgs_pt1_1.begin(), msgs_pt1_1.end()); + all_msgs.insert(all_msgs.end(), msgs_pt2.begin(), msgs_pt2.end()); + all_msgs.insert(all_msgs.end(), msgs_pt1_2.begin(), msgs_pt1_2.end()); + ASSERT_OK(helper->commit_->Commit(all_msgs, commit_identifier++)); + + ASSERT_OK_AND_ASSIGN(std::optional snapshot, helper->LatestSnapshot()); + ASSERT_TRUE(snapshot); + ASSERT_EQ(5, snapshot.value().NextRowId().value()); + + ASSERT_OK_AND_ASSIGN(std::vector> data_splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_EQ(data_splits.size(), 2); + + auto check_file = [](const std::shared_ptr& file, int64_t expected_row_id, + int64_t expected_row_count) { + ASSERT_EQ(expected_row_id, file->first_row_id.value()); + ASSERT_EQ(expected_row_count, file->row_count); + }; + + // Without partition group, row ids follow message order: pt1(0,1), pt2(2,3), pt1(4) + for (const auto& split : data_splits) { + auto data_split = std::dynamic_pointer_cast(split); + auto partition_value = data_split->Partition().GetStringView(0); + const auto& files = data_split->DataFiles(); + + if (partition_value == "pt1") { + // pt1 has two batches with a gap: batch1 at row_id=0, batch2 at row_id=4 + ASSERT_EQ(6, files.size()); + check_file(files[0], 0, 2); // main1 + check_file(files[1], 0, 2); // blob1 for main1 + check_file(files[2], 0, 2); // blob2 for main1 + check_file(files[3], 4, 1); // main2 (gap: pt2 took row_id 2,3) + check_file(files[4], 4, 1); // blob1 for main2 + check_file(files[5], 4, 1); // blob2 for main2 + // Verify row ids are NOT contiguous: main1 ends at 2, main2 starts at 4 + ASSERT_NE(files[0]->first_row_id.value() + files[0]->row_count, + files[3]->first_row_id.value()) + << "pt1 row ids should NOT be contiguous when partition group is disabled"; + } else { + ASSERT_EQ("pt2", partition_value); + ASSERT_EQ(3, files.size()); + check_file(files[0], 2, 2); // main + check_file(files[1], 2, 2); // blob1 + check_file(files[2], 2, 2); // blob2 + } + } +} + +TEST_P(WriteInteTest, TestMultipleBlobFieldsSplitByTargetSize) { + auto dir = UniqueTestDirectory::Create(); + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + BlobUtils::ToArrowField("blob1", true), BlobUtils::ToArrowField("blob2", true)}; + auto schema = arrow::schema(fields); + + auto file_format = GetParam(); + // Set a very small blob target file size to force splitting + std::map options = {{Options::MANIFEST_FORMAT, "orc"}, + {Options::FILE_FORMAT, file_format}, + {Options::BUCKET, "-1"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::FILE_SYSTEM, "local"}, + {Options::BLOB_AS_DESCRIPTOR, "false"}, + {Options::BLOB_TARGET_FILE_SIZE, "1"}}; + + ASSERT_OK_AND_ASSIGN( + auto helper, TestHelper::Create(dir->Str(), schema, /*partition_keys=*/{}, + /*primary_keys=*/{}, options, /*is_streaming_mode=*/true)); + int64_t commit_identifier = 0; + + // Write 3 rows — with target size = 1 byte, each row in each blob field should be a + // separate blob file. So we expect: 1 main file + 3 blob1 files + 3 blob2 files = 7 files. + std::string data = R"([ + ["str_0", null, "apple_data_long_enough", "red_data_long_enough"], + ["str_1", 1, "banana_data_long_enough", "yellow_data_long_enough"], + ["str_2", 2, "cat_data_long_enough", "black_data_long_enough"] + ])"; + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch, + TestHelper::MakeRecordBatch(arrow::struct_(fields), data, + /*partition_map=*/{}, /*bucket=*/0, {})); + + ASSERT_OK_AND_ASSIGN(auto commit_msgs, + helper->WriteAndCommit(std::move(batch), commit_identifier++, + /*expected_commit_messages=*/std::nullopt)); + ASSERT_EQ(commit_msgs.size(), 1); + + ASSERT_OK_AND_ASSIGN(std::optional snapshot, helper->LatestSnapshot()); + ASSERT_TRUE(snapshot); + ASSERT_EQ(1, snapshot.value().Id()); + ASSERT_EQ(3, snapshot.value().NextRowId().value()); + + ASSERT_OK_AND_ASSIGN(std::vector> data_splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + ASSERT_EQ(data_splits.size(), 1); + auto data_split = std::dynamic_pointer_cast(data_splits[0]); + ASSERT_TRUE(data_split); + + // 1 main file + 3 blob1 files + 3 blob2 files = 7 files total + // File order: main, blob1(row0), blob1(row1), blob1(row2), blob2(row0), blob2(row1), + // blob2(row2) + const auto& files = data_split->DataFiles(); + ASSERT_EQ(files.size(), 7); + + auto check_file = [](const std::shared_ptr& file, int64_t expected_row_id, + int64_t expected_row_count) { + ASSERT_EQ(expected_row_id, file->first_row_id.value()); + ASSERT_EQ(expected_row_count, file->row_count); + }; + + // files[0]: main file + ASSERT_FALSE(BlobUtils::IsBlobFile(files[0]->file_name)); + check_file(files[0], 0, 3); + + // files[1..3]: blob1 files, each row_count=1, first_row_id=0,1,2 + for (int i = 0; i < 3; ++i) { + ASSERT_TRUE(BlobUtils::IsBlobFile(files[1 + i]->file_name)); + ASSERT_EQ("blob1", files[1 + i]->write_cols->at(0)); + check_file(files[1 + i], i, 1); + } + + // files[4..6]: blob2 files, each row_count=1, first_row_id=0,1,2 + for (int i = 0; i < 3; ++i) { + ASSERT_TRUE(BlobUtils::IsBlobFile(files[4 + i]->file_name)); + ASSERT_EQ("blob2", files[4 + i]->write_cols->at(0)); + check_file(files[4 + i], i, 1); + } +} } // namespace paimon::test