Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 51 additions & 20 deletions src/paimon/global_index/lumina/lumina_global_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,12 @@ Result<std::shared_ptr<GlobalIndexReader>> LuminaGlobalIndex::CreateReader(
class LuminaDataset : public ::lumina::api::Dataset {
public:
LuminaDataset(int64_t element_count, uint32_t dimension,
const std::vector<std::shared_ptr<arrow::FloatArray>>& array_vec)
: element_count_(element_count), dimension_(dimension), array_vec_(array_vec) {}
const std::vector<std::shared_ptr<arrow::FloatArray>>& array_vec,
const std::vector<int64_t>& start_ids)
: element_count_(element_count),
dimension_(dimension),
array_vec_(array_vec),
start_ids_(start_ids) {}

uint32_t Dim() const noexcept override {
return dimension_;
Expand All @@ -200,8 +204,8 @@ class LuminaDataset : public ::lumina::api::Dataset {
vector_buffer.resize(value_array_length);
memcpy(vector_buffer.data(), value_ptr, sizeof(float) * value_array_length);
id_buffer.resize(element_count);
std::iota(id_buffer.begin(), id_buffer.end(), id_);
id_ += element_count;
std::iota(id_buffer.begin(), id_buffer.end(),
static_cast<::lumina::core::vector_id_t>(start_ids_[cursor_]));

// release the array when copy to vector_buffer
value_array.reset();
Expand All @@ -213,8 +217,8 @@ class LuminaDataset : public ::lumina::api::Dataset {
int64_t element_count_;
uint32_t dimension_;
std::vector<std::shared_ptr<arrow::FloatArray>> array_vec_;
std::vector<int64_t> start_ids_;
size_t cursor_ = 0;
::lumina::core::vector_id_t id_ = 0;
};

LuminaIndexWriter::LuminaIndexWriter(const std::string& field_name,
Expand Down Expand Up @@ -254,35 +258,62 @@ Status LuminaIndexWriter::AddBatch(::ArrowArray* arrow_array,
auto list_field_array = std::dynamic_pointer_cast<arrow::ListArray>(field_array);
CHECK_NOT_NULL(list_field_array,
"invalid input array in LuminaIndexWriter, field array must be list array");
auto value_array = std::dynamic_pointer_cast<arrow::FloatArray>(list_field_array->values());
CHECK_NOT_NULL(
value_array,
"invalid input array in LuminaIndexWriter, field value array must be float array");
if (value_array->null_count() != 0) {
return Status::Invalid("field value array in LuminaIndexWriter is invalid, must not null");
}
if (value_array->length() != field_length * dimension_) {
return Status::Invalid(fmt::format(
"invalid input array in LuminaIndexWriter, length of field array [{}] multiplied "
"dimension [{}] must match length of field value array [{}]",
field_length, dimension_, value_array->length()));

// Split into contiguous non-null segments, skipping null rows in the list field.
int64_t segment_start = -1;
for (int64_t i = 0; i <= field_length; i++) {
bool is_null = (i < field_length) && list_field_array->IsNull(i);
bool is_end = (i == field_length);

if (!is_null && !is_end && segment_start == -1) {
segment_start = i;
}

if ((is_null || is_end) && segment_start != -1) {
int64_t segment_len = i - segment_start;
// Use value_offset to precisely locate the float range for this segment
auto value_start_offset = list_field_array->value_offset(segment_start);
auto value_end_offset = list_field_array->value_offset(segment_start + segment_len);
int64_t value_length = value_end_offset - value_start_offset;
auto sliced_values = std::dynamic_pointer_cast<arrow::FloatArray>(
list_field_array->values()->Slice(value_start_offset, value_length));
CHECK_NOT_NULL(sliced_values,
"invalid sliced value array in LuminaIndexWriter, must be float array");
if (sliced_values->null_count() != 0) {
return Status::Invalid(
"field value array in LuminaIndexWriter is invalid, must not null");
}
if (sliced_values->length() != segment_len * static_cast<int64_t>(dimension_)) {
return Status::Invalid(fmt::format(
"invalid input array in LuminaIndexWriter, length of field array [{}] "
"multiplied dimension [{}] must match length of field value array [{}]",
segment_len, dimension_, sliced_values->length()));
}
array_vec_.push_back(std::move(sliced_values));
array_start_ids_.push_back(count_ + segment_start);
indexed_count_ += segment_len;
segment_start = -1;
}
}

count_ += array->length();
array_vec_.push_back(std::move(value_array));
return Status::OK();
}

Result<std::vector<GlobalIndexIOMeta>> LuminaIndexWriter::Finish() {
if (indexed_count_ == 0) {
return std::vector<GlobalIndexIOMeta>();
}
::lumina::core::MemoryResourceConfig memory_resource(pool_.get());
PAIMON_ASSIGN_OR_RAISE_FROM_LUMINA(
::lumina::api::LuminaBuilder builder,
::lumina::api::LuminaBuilder::Create(builder_options_, memory_resource));
// pretrain
LuminaDataset dataset1(count_, dimension_, array_vec_);
LuminaDataset dataset1(indexed_count_, dimension_, array_vec_, array_start_ids_);
PAIMON_RETURN_NOT_OK_FROM_LUMINA(builder.PretrainFrom(dataset1));

// insert data
LuminaDataset dataset2(count_, dimension_, array_vec_);
LuminaDataset dataset2(indexed_count_, dimension_, array_vec_, array_start_ids_);
std::vector<std::shared_ptr<arrow::FloatArray>>().swap(array_vec_);
PAIMON_RETURN_NOT_OK_FROM_LUMINA(builder.InsertFrom(dataset2));

Expand Down
2 changes: 2 additions & 0 deletions src/paimon/global_index/lumina/lumina_global_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class LuminaIndexWriter : public GlobalIndexWriter {

private:
int64_t count_ = 0;
int64_t indexed_count_ = 0;
std::shared_ptr<LuminaMemoryPool> pool_;
std::string field_name_;
std::shared_ptr<arrow::DataType> arrow_type_;
Expand All @@ -99,6 +100,7 @@ class LuminaIndexWriter : public GlobalIndexWriter {
::lumina::api::IOOptions io_options_;
std::map<std::string, std::string> lumina_options_;
std::vector<std::shared_ptr<arrow::FloatArray>> array_vec_;
std::vector<int64_t> array_start_ids_;
};

class LuminaIndexReader : public GlobalIndexReader {
Expand Down
199 changes: 198 additions & 1 deletion src/paimon/global_index/lumina/lumina_global_index_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class LuminaGlobalIndexTest : public ::testing::Test {
return struct_array;
}

private:
protected:
std::shared_ptr<MemoryPool> pool_ = GetDefaultPool();
std::shared_ptr<FileSystem> fs_ = std::make_shared<LocalFileSystem>();
std::map<std::string, std::string> options_ = {{"lumina.index.dimension", "4"},
Expand Down Expand Up @@ -470,4 +470,201 @@ TEST_F(LuminaGlobalIndexTest, TestHighCardinalityAndMultiThreadSearch) {
}
}

TEST_F(LuminaGlobalIndexTest, TestWriteWithNullRows) {
auto test_root_dir = paimon::test::UniqueTestDirectory::Create();
ASSERT_TRUE(test_root_dir);
std::string test_root = test_root_dir->Str();

// Array with null at row 1 (middle): rows 0,2,3 are valid, row 1 is null
// This should split into two segments: [0,0] and [2,3]
std::shared_ptr<arrow::Array> array_with_null =
arrow::ipc::internal::json::ArrayFromJSON(data_type_,
R"([
[[0.0, 0.0, 0.0, 0.0]],
[null],
[[1.0, 0.0, 1.0, 0.0]],
[[1.0, 1.0, 1.0, 1.0]]
])")
.ValueOrDie();

ASSERT_OK_AND_ASSIGN(
auto meta, WriteGlobalIndex(test_root, data_type_, options_, array_with_null, Range(0, 3)));
ASSERT_OK_AND_ASSIGN(auto reader,
CreateGlobalIndexReader(test_root, data_type_, options_, meta));
{
// Search should return ids 0, 2, 3 (skipping null row 1)
ASSERT_OK_AND_ASSIGN(
auto scored_result,
reader->VisitVectorSearch(std::make_shared<VectorSearch>(
/*field_name=*/"f0", /*limit=*/4, query_, /*filter=*/nullptr,
/*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/options_)));
// Only 3 vectors indexed (row 1 is null), so limit=4 returns 3
CheckResult(scored_result, {3l, 2l, 0l}, {0.01f, 2.21f, 4.21f});
}
}

TEST_F(LuminaGlobalIndexTest, TestWriteWithMultipleNullSegments) {
auto test_root_dir = paimon::test::UniqueTestDirectory::Create();
ASSERT_TRUE(test_root_dir);
std::string test_root = test_root_dir->Str();

// Nulls at rows 0, 2, 5: valid rows are 1, 3, 4
// Splits into segments: [1,1], [3,4]
std::shared_ptr<arrow::Array> array_with_nulls =
arrow::ipc::internal::json::ArrayFromJSON(data_type_,
R"([
[null],
[[0.0, 1.0, 0.0, 1.0]],
[null],
[[1.0, 0.0, 1.0, 0.0]],
[[1.0, 1.0, 1.0, 1.0]],
[null]
])")
.ValueOrDie();

ASSERT_OK_AND_ASSIGN(auto meta, WriteGlobalIndex(test_root, data_type_, options_,
array_with_nulls, Range(0, 5)));
ASSERT_OK_AND_ASSIGN(auto reader,
CreateGlobalIndexReader(test_root, data_type_, options_, meta));
{
ASSERT_OK_AND_ASSIGN(
auto scored_result,
reader->VisitVectorSearch(std::make_shared<VectorSearch>(
/*field_name=*/"f0", /*limit=*/4, query_, /*filter=*/nullptr,
/*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/options_)));
// Only 3 vectors indexed at ids 1, 3, 4
CheckResult(scored_result, {4l, 1l, 3l}, {0.01f, 2.01f, 2.21f});
}
}

TEST_F(LuminaGlobalIndexTest, TestWriteWithAllNullRows) {
auto test_root_dir = paimon::test::UniqueTestDirectory::Create();
ASSERT_TRUE(test_root_dir);
std::string test_root = test_root_dir->Str();

// All rows are null — no vectors to index
std::shared_ptr<arrow::Array> all_null_array =
arrow::ipc::internal::json::ArrayFromJSON(data_type_,
R"([
[null],
[null],
[null]
])")
.ValueOrDie();

auto global_index = std::make_shared<LuminaGlobalIndex>(options_);
auto path_factory = std::make_shared<FakeIndexPathFactory>(test_root);
auto file_writer = std::make_shared<GlobalIndexFileManager>(fs_, path_factory);

ASSERT_OK_AND_ASSIGN(
std::shared_ptr<GlobalIndexWriter> global_writer,
global_index->CreateWriter("f0", CreateArrowSchema(data_type_).get(), file_writer, pool_));

ArrowArray c_array;
ASSERT_TRUE(arrow::ExportArray(*all_null_array, &c_array).ok());
std::vector<int64_t> row_ids = {0, 1, 2};
ASSERT_OK(global_writer->AddBatch(&c_array, std::move(row_ids)));
// Finish with zero indexed vectors — returns empty metas
ASSERT_OK_AND_ASSIGN(auto result_metas, global_writer->Finish());
ASSERT_TRUE(result_metas.empty());
}

TEST_F(LuminaGlobalIndexTest, TestWriteWithNullAndFilter) {
auto test_root_dir = paimon::test::UniqueTestDirectory::Create();
ASSERT_TRUE(test_root_dir);
std::string test_root = test_root_dir->Str();

// Null at row 2: valid rows are 0, 1, 3
std::shared_ptr<arrow::Array> array_with_null =
arrow::ipc::internal::json::ArrayFromJSON(data_type_,
R"([
[[0.0, 0.0, 0.0, 0.0]],
[[0.0, 1.0, 0.0, 1.0]],
[null],
[[1.0, 1.0, 1.0, 1.0]]
])")
.ValueOrDie();

ASSERT_OK_AND_ASSIGN(
auto meta, WriteGlobalIndex(test_root, data_type_, options_, array_with_null, Range(0, 3)));
ASSERT_OK_AND_ASSIGN(auto reader,
CreateGlobalIndexReader(test_root, data_type_, options_, meta));
{
// Filter: only allow ids < 3 (filters out id=3), so only ids 0, 1 remain
auto filter = [](int64_t id) -> bool { return id < 3; };
ASSERT_OK_AND_ASSIGN(
auto scored_result,
reader->VisitVectorSearch(std::make_shared<VectorSearch>(
/*field_name=*/"f0", /*limit=*/4, query_, filter,
/*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/options_)));
CheckResult(scored_result, {1l, 0l}, {2.01f, 4.21f});
}
}

TEST_F(LuminaGlobalIndexTest, TestWriteWithNullAcrossMultipleBatches) {
auto test_root_dir = paimon::test::UniqueTestDirectory::Create();
ASSERT_TRUE(test_root_dir);
std::string test_root = test_root_dir->Str();

// Batch 1: rows 0-2, null at row 1 → indexed ids: {0, 2}
std::shared_ptr<arrow::Array> batch1 = arrow::ipc::internal::json::ArrayFromJSON(data_type_,
R"([
[[0.0, 0.0, 0.0, 0.0]],
[null],
[[1.0, 0.0, 1.0, 0.0]]
])")
.ValueOrDie();

// Batch 2: rows 3-5, null at row 3 → indexed ids: {4, 5}
std::shared_ptr<arrow::Array> batch2 = arrow::ipc::internal::json::ArrayFromJSON(data_type_,
R"([
[null],
[[1.0, 1.0, 1.0, 1.0]],
[[0.0, 1.0, 0.0, 1.0]]
])")
.ValueOrDie();

auto global_index = std::make_shared<LuminaGlobalIndex>(options_);
auto path_factory = std::make_shared<FakeIndexPathFactory>(test_root);
auto file_writer = std::make_shared<GlobalIndexFileManager>(fs_, path_factory);

ASSERT_OK_AND_ASSIGN(
std::shared_ptr<GlobalIndexWriter> global_writer,
global_index->CreateWriter("f0", CreateArrowSchema(data_type_).get(), file_writer, pool_));

// AddBatch 1: row_ids {0, 1, 2}
{
ArrowArray c_array;
ASSERT_TRUE(arrow::ExportArray(*batch1, &c_array).ok());
std::vector<int64_t> row_ids = {0, 1, 2};
ASSERT_OK(global_writer->AddBatch(&c_array, std::move(row_ids)));
}
// AddBatch 2: row_ids {3, 4, 5}
{
ArrowArray c_array;
ASSERT_TRUE(arrow::ExportArray(*batch2, &c_array).ok());
std::vector<int64_t> row_ids = {3, 4, 5};
ASSERT_OK(global_writer->AddBatch(&c_array, std::move(row_ids)));
}

ASSERT_OK_AND_ASSIGN(auto result_metas, global_writer->Finish());
ASSERT_EQ(result_metas.size(), 1);

ASSERT_OK_AND_ASSIGN(auto reader,
CreateGlobalIndexReader(test_root, data_type_, options_, result_metas[0]));
{
// Search all: should return ids {0, 2, 4, 5}, never {1, 3}
ASSERT_OK_AND_ASSIGN(
auto scored_result,
reader->VisitVectorSearch(std::make_shared<VectorSearch>(
/*field_name=*/"f0", /*limit=*/10, query_, /*filter=*/nullptr,
/*predicate=*/nullptr, /*distance_type=*/std::nullopt, /*options=*/options_)));
// id 0: [0,0,0,0] → L2 dist to [1,1,1,1.1] = 4.21
// id 2: [1,0,1,0] → L2 dist = 2.21
// id 4: [1,1,1,1] → L2 dist = 0.01
// id 5: [0,1,0,1] → L2 dist = 2.01
CheckResult(scored_result, {4l, 5l, 2l, 0l}, {0.01f, 2.01f, 2.21f, 4.21f});
}
}

} // namespace paimon::lumina::test
Loading
Loading