diff --git a/src/paimon/global_index/lumina/lumina_global_index.cpp b/src/paimon/global_index/lumina/lumina_global_index.cpp index 041ed53e8..be4bdb92e 100644 --- a/src/paimon/global_index/lumina/lumina_global_index.cpp +++ b/src/paimon/global_index/lumina/lumina_global_index.cpp @@ -177,8 +177,12 @@ Result> LuminaGlobalIndex::CreateReader( class LuminaDataset : public ::lumina::api::Dataset { public: LuminaDataset(int64_t element_count, uint32_t dimension, - const std::vector>& array_vec) - : element_count_(element_count), dimension_(dimension), array_vec_(array_vec) {} + const std::vector>& array_vec, + const std::vector& start_ids) + : element_count_(element_count), + dimension_(dimension), + array_vec_(array_vec), + start_ids_(start_ids) {} uint32_t Dim() const noexcept override { return dimension_; @@ -200,8 +204,8 @@ class LuminaDataset : public ::lumina::api::Dataset { vector_buffer.resize(value_array_length); memcpy(vector_buffer.data(), value_ptr, sizeof(float) * value_array_length); id_buffer.resize(element_count); - std::iota(id_buffer.begin(), id_buffer.end(), id_); - id_ += element_count; + std::iota(id_buffer.begin(), id_buffer.end(), + static_cast<::lumina::core::vector_id_t>(start_ids_[cursor_])); // release the array when copy to vector_buffer value_array.reset(); @@ -213,8 +217,8 @@ class LuminaDataset : public ::lumina::api::Dataset { int64_t element_count_; uint32_t dimension_; std::vector> array_vec_; + std::vector start_ids_; size_t cursor_ = 0; - ::lumina::core::vector_id_t id_ = 0; }; LuminaIndexWriter::LuminaIndexWriter(const std::string& field_name, @@ -254,35 +258,62 @@ Status LuminaIndexWriter::AddBatch(::ArrowArray* arrow_array, auto list_field_array = std::dynamic_pointer_cast(field_array); CHECK_NOT_NULL(list_field_array, "invalid input array in LuminaIndexWriter, field array must be list array"); - auto value_array = std::dynamic_pointer_cast(list_field_array->values()); - CHECK_NOT_NULL( - value_array, - "invalid input array in LuminaIndexWriter, field value array must be float array"); - if (value_array->null_count() != 0) { - return Status::Invalid("field value array in LuminaIndexWriter is invalid, must not null"); - } - if (value_array->length() != field_length * dimension_) { - return Status::Invalid(fmt::format( - "invalid input array in LuminaIndexWriter, length of field array [{}] multiplied " - "dimension [{}] must match length of field value array [{}]", - field_length, dimension_, value_array->length())); + + // Split into contiguous non-null segments, skipping null rows in the list field. + int64_t segment_start = -1; + for (int64_t i = 0; i <= field_length; i++) { + bool is_null = (i < field_length) && list_field_array->IsNull(i); + bool is_end = (i == field_length); + + if (!is_null && !is_end && segment_start == -1) { + segment_start = i; + } + + if ((is_null || is_end) && segment_start != -1) { + int64_t segment_len = i - segment_start; + // Use value_offset to precisely locate the float range for this segment + auto value_start_offset = list_field_array->value_offset(segment_start); + auto value_end_offset = list_field_array->value_offset(segment_start + segment_len); + int64_t value_length = value_end_offset - value_start_offset; + auto sliced_values = std::dynamic_pointer_cast( + list_field_array->values()->Slice(value_start_offset, value_length)); + CHECK_NOT_NULL(sliced_values, + "invalid sliced value array in LuminaIndexWriter, must be float array"); + if (sliced_values->null_count() != 0) { + return Status::Invalid( + "field value array in LuminaIndexWriter is invalid, must not null"); + } + if (sliced_values->length() != segment_len * static_cast(dimension_)) { + return Status::Invalid(fmt::format( + "invalid input array in LuminaIndexWriter, length of field array [{}] " + "multiplied dimension [{}] must match length of field value array [{}]", + segment_len, dimension_, sliced_values->length())); + } + array_vec_.push_back(std::move(sliced_values)); + array_start_ids_.push_back(count_ + segment_start); + indexed_count_ += segment_len; + segment_start = -1; + } } + count_ += array->length(); - array_vec_.push_back(std::move(value_array)); return Status::OK(); } Result> LuminaIndexWriter::Finish() { + if (indexed_count_ == 0) { + return std::vector(); + } ::lumina::core::MemoryResourceConfig memory_resource(pool_.get()); PAIMON_ASSIGN_OR_RAISE_FROM_LUMINA( ::lumina::api::LuminaBuilder builder, ::lumina::api::LuminaBuilder::Create(builder_options_, memory_resource)); // pretrain - LuminaDataset dataset1(count_, dimension_, array_vec_); + LuminaDataset dataset1(indexed_count_, dimension_, array_vec_, array_start_ids_); PAIMON_RETURN_NOT_OK_FROM_LUMINA(builder.PretrainFrom(dataset1)); // insert data - LuminaDataset dataset2(count_, dimension_, array_vec_); + LuminaDataset dataset2(indexed_count_, dimension_, array_vec_, array_start_ids_); std::vector>().swap(array_vec_); PAIMON_RETURN_NOT_OK_FROM_LUMINA(builder.InsertFrom(dataset2)); diff --git a/src/paimon/global_index/lumina/lumina_global_index.h b/src/paimon/global_index/lumina/lumina_global_index.h index 0c5a11ab5..cbd6c7d9d 100644 --- a/src/paimon/global_index/lumina/lumina_global_index.h +++ b/src/paimon/global_index/lumina/lumina_global_index.h @@ -90,6 +90,7 @@ class LuminaIndexWriter : public GlobalIndexWriter { private: int64_t count_ = 0; + int64_t indexed_count_ = 0; std::shared_ptr pool_; std::string field_name_; std::shared_ptr arrow_type_; @@ -99,6 +100,7 @@ class LuminaIndexWriter : public GlobalIndexWriter { ::lumina::api::IOOptions io_options_; std::map lumina_options_; std::vector> array_vec_; + std::vector array_start_ids_; }; class LuminaIndexReader : public GlobalIndexReader { diff --git a/src/paimon/global_index/lumina/lumina_global_index_test.cpp b/src/paimon/global_index/lumina/lumina_global_index_test.cpp index d87fb707b..e5aeca71a 100644 --- a/src/paimon/global_index/lumina/lumina_global_index_test.cpp +++ b/src/paimon/global_index/lumina/lumina_global_index_test.cpp @@ -158,7 +158,7 @@ class LuminaGlobalIndexTest : public ::testing::Test { return struct_array; } - private: + protected: std::shared_ptr pool_ = GetDefaultPool(); std::shared_ptr fs_ = std::make_shared(); std::map options_ = {{"lumina.index.dimension", "4"}, @@ -470,4 +470,201 @@ TEST_F(LuminaGlobalIndexTest, TestHighCardinalityAndMultiThreadSearch) { } } +TEST_F(LuminaGlobalIndexTest, TestWriteWithNullRows) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + + // Array with null at row 1 (middle): rows 0,2,3 are valid, row 1 is null + // This should split into two segments: [0,0] and [2,3] + std::shared_ptr array_with_null = + arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + [[0.0, 0.0, 0.0, 0.0]], + [null], + [[1.0, 0.0, 1.0, 0.0]], + [[1.0, 1.0, 1.0, 1.0]] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN( + auto meta, WriteGlobalIndex(test_root, data_type_, options_, array_with_null, Range(0, 3))); + ASSERT_OK_AND_ASSIGN(auto reader, + CreateGlobalIndexReader(test_root, data_type_, options_, meta)); + { + // Search should return ids 0, 2, 3 (skipping null row 1) + ASSERT_OK_AND_ASSIGN( + auto scored_result, + reader->VisitVectorSearch(std::make_shared( + /*field_name=*/"f0", /*limit=*/4, query_, /*filter=*/nullptr, + /*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/options_))); + // Only 3 vectors indexed (row 1 is null), so limit=4 returns 3 + CheckResult(scored_result, {3l, 2l, 0l}, {0.01f, 2.21f, 4.21f}); + } +} + +TEST_F(LuminaGlobalIndexTest, TestWriteWithMultipleNullSegments) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + + // Nulls at rows 0, 2, 5: valid rows are 1, 3, 4 + // Splits into segments: [1,1], [3,4] + std::shared_ptr array_with_nulls = + arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + [null], + [[0.0, 1.0, 0.0, 1.0]], + [null], + [[1.0, 0.0, 1.0, 0.0]], + [[1.0, 1.0, 1.0, 1.0]], + [null] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN(auto meta, WriteGlobalIndex(test_root, data_type_, options_, + array_with_nulls, Range(0, 5))); + ASSERT_OK_AND_ASSIGN(auto reader, + CreateGlobalIndexReader(test_root, data_type_, options_, meta)); + { + ASSERT_OK_AND_ASSIGN( + auto scored_result, + reader->VisitVectorSearch(std::make_shared( + /*field_name=*/"f0", /*limit=*/4, query_, /*filter=*/nullptr, + /*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/options_))); + // Only 3 vectors indexed at ids 1, 3, 4 + CheckResult(scored_result, {4l, 1l, 3l}, {0.01f, 2.01f, 2.21f}); + } +} + +TEST_F(LuminaGlobalIndexTest, TestWriteWithAllNullRows) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + + // All rows are null — no vectors to index + std::shared_ptr all_null_array = + arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + [null], + [null], + [null] + ])") + .ValueOrDie(); + + auto global_index = std::make_shared(options_); + auto path_factory = std::make_shared(test_root); + auto file_writer = std::make_shared(fs_, path_factory); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr global_writer, + global_index->CreateWriter("f0", CreateArrowSchema(data_type_).get(), file_writer, pool_)); + + ArrowArray c_array; + ASSERT_TRUE(arrow::ExportArray(*all_null_array, &c_array).ok()); + std::vector row_ids = {0, 1, 2}; + ASSERT_OK(global_writer->AddBatch(&c_array, std::move(row_ids))); + // Finish with zero indexed vectors — returns empty metas + ASSERT_OK_AND_ASSIGN(auto result_metas, global_writer->Finish()); + ASSERT_TRUE(result_metas.empty()); +} + +TEST_F(LuminaGlobalIndexTest, TestWriteWithNullAndFilter) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + + // Null at row 2: valid rows are 0, 1, 3 + std::shared_ptr array_with_null = + arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + [[0.0, 0.0, 0.0, 0.0]], + [[0.0, 1.0, 0.0, 1.0]], + [null], + [[1.0, 1.0, 1.0, 1.0]] + ])") + .ValueOrDie(); + + ASSERT_OK_AND_ASSIGN( + auto meta, WriteGlobalIndex(test_root, data_type_, options_, array_with_null, Range(0, 3))); + ASSERT_OK_AND_ASSIGN(auto reader, + CreateGlobalIndexReader(test_root, data_type_, options_, meta)); + { + // Filter: only allow ids < 3 (filters out id=3), so only ids 0, 1 remain + auto filter = [](int64_t id) -> bool { return id < 3; }; + ASSERT_OK_AND_ASSIGN( + auto scored_result, + reader->VisitVectorSearch(std::make_shared( + /*field_name=*/"f0", /*limit=*/4, query_, filter, + /*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/options_))); + CheckResult(scored_result, {1l, 0l}, {2.01f, 4.21f}); + } +} + +TEST_F(LuminaGlobalIndexTest, TestWriteWithNullAcrossMultipleBatches) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + + // Batch 1: rows 0-2, null at row 1 → indexed ids: {0, 2} + std::shared_ptr batch1 = arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + [[0.0, 0.0, 0.0, 0.0]], + [null], + [[1.0, 0.0, 1.0, 0.0]] + ])") + .ValueOrDie(); + + // Batch 2: rows 3-5, null at row 3 → indexed ids: {4, 5} + std::shared_ptr batch2 = arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + [null], + [[1.0, 1.0, 1.0, 1.0]], + [[0.0, 1.0, 0.0, 1.0]] + ])") + .ValueOrDie(); + + auto global_index = std::make_shared(options_); + auto path_factory = std::make_shared(test_root); + auto file_writer = std::make_shared(fs_, path_factory); + + ASSERT_OK_AND_ASSIGN( + std::shared_ptr global_writer, + global_index->CreateWriter("f0", CreateArrowSchema(data_type_).get(), file_writer, pool_)); + + // AddBatch 1: row_ids {0, 1, 2} + { + ArrowArray c_array; + ASSERT_TRUE(arrow::ExportArray(*batch1, &c_array).ok()); + std::vector row_ids = {0, 1, 2}; + ASSERT_OK(global_writer->AddBatch(&c_array, std::move(row_ids))); + } + // AddBatch 2: row_ids {3, 4, 5} + { + ArrowArray c_array; + ASSERT_TRUE(arrow::ExportArray(*batch2, &c_array).ok()); + std::vector row_ids = {3, 4, 5}; + ASSERT_OK(global_writer->AddBatch(&c_array, std::move(row_ids))); + } + + ASSERT_OK_AND_ASSIGN(auto result_metas, global_writer->Finish()); + ASSERT_EQ(result_metas.size(), 1); + + ASSERT_OK_AND_ASSIGN(auto reader, + CreateGlobalIndexReader(test_root, data_type_, options_, result_metas[0])); + { + // Search all: should return ids {0, 2, 4, 5}, never {1, 3} + ASSERT_OK_AND_ASSIGN( + auto scored_result, + reader->VisitVectorSearch(std::make_shared( + /*field_name=*/"f0", /*limit=*/10, query_, /*filter=*/nullptr, + /*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/options_))); + // id 0: [0,0,0,0] → L2 dist to [1,1,1,1.1] = 4.21 + // id 2: [1,0,1,0] → L2 dist = 2.21 + // id 4: [1,1,1,1] → L2 dist = 0.01 + // id 5: [0,1,0,1] → L2 dist = 2.01 + CheckResult(scored_result, {4l, 5l, 2l, 0l}, {0.01f, 2.01f, 2.21f, 4.21f}); + } +} + } // namespace paimon::lumina::test diff --git a/test/inte/global_index_test.cpp b/test/inte/global_index_test.cpp index 0dc7b9ab2..aa23169f1 100644 --- a/test/inte/global_index_test.cpp +++ b/test/inte/global_index_test.cpp @@ -989,8 +989,10 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { ["Bob", [0.0, 1.0, 0.0, 1.0], 10, 12.1], ["Emily", [1.0, 0.0, 1.0, 0.0], 10, 13.1], ["Tony", [1.0, 1.0, 1.0, 1.0], 10, 14.1], +["NullGuy1", null, 10, 20.0], ["Lucy", [10.0, 10.0, 10.0, 10.0], 20, 15.1], ["Bob", [10.0, 11.0, 10.0, 11.0], 20, 16.1], +["NullGuy2", null, 20, 21.0], ["Tony", [11.0, 10.0, 11.0, 10.0], 20, 17.1], ["Alice", [11.0, 11.0, 11.0, 11.0], 20, 18.1], ["Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1] @@ -1002,7 +1004,7 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { // write and commit lumina index ASSERT_OK(WriteIndex(table_path, /*partition_filters=*/{}, "f1", "lumina", - /*options=*/lumina_options, Range(0, 8))); + /*options=*/lumina_options, Range(0, 10))); auto scan_and_check_result = [&](const std::vector& read_row_ranges, const std::shared_ptr& expected_array, @@ -1020,8 +1022,8 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { result_fields.insert(result_fields.begin(), SpecialFields::ValueKind().ArrowField()); result_fields.push_back(SpecialFields::IndexScore().ArrowField()); std::map id_to_score = {{0, 4.21f}, {1, 2.01f}, {2, 2.21f}, - {3, 0.01f}, {4, 322.21f}, {5, 360.01f}, - {6, 360.21f}, {7, 398.01}, {8, 322.21f}}; + {3, 0.01f}, {5, 322.21f}, {6, 360.01f}, + {8, 360.21f}, {9, 398.01f}, {10, 322.21f}}; { auto expected_array = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(result_fields), R"([ @@ -1036,7 +1038,8 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { [0, "Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1, 322.21] ])") .ValueOrDie(); - scan_and_check_result({Range(0, 8)}, expected_array, id_to_score); + scan_and_check_result({Range(0, 3), Range(5, 6), Range(8, 10)}, expected_array, + id_to_score); } { auto expected_array = @@ -1047,7 +1050,7 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { [0, "Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1, 322.21] ])") .ValueOrDie(); - scan_and_check_result({Range(2, 3), Range(7, 8)}, expected_array, id_to_score); + scan_and_check_result({Range(2, 3), Range(9, 10)}, expected_array, id_to_score); } { auto expected_array = @@ -1055,7 +1058,7 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { [0, "Bob", [10.0, 11.0, 10.0, 11.0], 20, 16.1, 360.01] ])") .ValueOrDie(); - scan_and_check_result({Range(5, 5)}, expected_array, id_to_score); + scan_and_check_result({Range(6, 6)}, expected_array, id_to_score); } { auto expected_array = @@ -1066,7 +1069,31 @@ TEST_P(GlobalIndexTest, TestWriteCommitScanReadIndexWithScore) { [0, "Paul", [10.0, 10.0, 10.0, 10.0], 20, 19.1, null] ])") .ValueOrDie(); - scan_and_check_result({Range(2, 3), Range(7, 8)}, expected_array, /*id_to_score=*/{}); + scan_and_check_result({Range(2, 3), Range(9, 10)}, expected_array, /*id_to_score=*/{}); + } + { + // Verify null rows (id 4 and 7) are never recalled by vector search + ASSERT_OK_AND_ASSIGN( + std::shared_ptr global_index_scan, + GlobalIndexScan::Create(table_path, /*snapshot_id=*/std::nullopt, + /*partitions=*/std::nullopt, lumina_options, fs_, + /*executor=*/nullptr, pool_)); + ASSERT_OK_AND_ASSIGN(auto lumina_readers, + global_index_scan->CreateReaders("f1", std::nullopt)); + ASSERT_EQ(lumina_readers.size(), 1u); + std::vector query = {1.0f, 1.0f, 1.0f, 1.1f}; + auto vector_search = std::make_shared( + "f1", /*limit=*/20, query, /*filter=*/nullptr, + /*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/lumina_options); + ASSERT_OK_AND_ASSIGN(auto scored_result, + lumina_readers[0]->VisitVectorSearch(vector_search)); + auto typed_result = std::dynamic_pointer_cast(scored_result); + ASSERT_TRUE(typed_result); + // Should recall 9 vectors (11 total rows - 2 null rows) + ASSERT_EQ(typed_result->bitmap_.Cardinality(), 9u); + // Null row ids 4 and 7 must not be in the result + ASSERT_FALSE(typed_result->bitmap_.Contains(4)); + ASSERT_FALSE(typed_result->bitmap_.Contains(7)); } } #endif