Skip to content

Commit 59ca294

Browse files
authored
feat: support specific fs in ReadContext & options in VectorSearch (#57)
* feat: support specific fs in ReadContext & update lumina lib * feat: support user-defined options in VectorSearch
1 parent 67c660c commit 59ca294

38 files changed

Lines changed: 748 additions & 245 deletions

include/paimon/defs.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ struct PAIMON_EXPORT Options {
105105
/// Default value is local.
106106
static const char FILE_SYSTEM[];
107107

108-
/// "target-file-size" - Target size of a file. Default value is 128MB.
109-
// TODO(yonghao.fyh): xinyu, change the default value to 128MB for primary key table.
108+
/// "target-file-size" - Target size of a file. Default value is 256MB.
109+
// TODO(xinyu.lxy): change the default value to 128MB for primary key table.
110110
static const char TARGET_FILE_SIZE[];
111111

112112
/// "blob.target-file-size" - Target size of a blob file. Default is TARGET_FILE_SIZE.

include/paimon/global_index/io/global_index_file_reader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class PAIMON_EXPORT GlobalIndexFileReader {
3030

3131
/// Opens an input stream for reading the specified global index file.
3232
virtual Result<std::unique_ptr<InputStream>> GetInputStream(
33-
const std::string& file_name) const = 0;
33+
const std::string& file_path) const = 0;
3434
};
3535

3636
} // namespace paimon

include/paimon/predicate/vector_search.h

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
#pragma once
1818
#include <functional>
19+
#include <map>
1920
#include <memory>
21+
#include <optional>
2022
#include <string>
2123
#include <vector>
2224

@@ -35,16 +37,24 @@ struct PAIMON_EXPORT VectorSearch {
3537
/// @note Must be thread-safe.
3638
using PreFilter = std::function<bool(int64_t)>;
3739

40+
/// Enumeration of distance or similarity metrics for vector comparison.
41+
enum class DistanceType { EUCLIDEAN = 1, INNER_PRODUCT = 2, COSINE = 3, UNKNOWN = 128 };
42+
3843
VectorSearch(const std::string& _field_name, int32_t _limit, const std::vector<float>& _query,
39-
PreFilter _pre_filter, const std::shared_ptr<Predicate>& _predicate)
44+
PreFilter _pre_filter, const std::shared_ptr<Predicate>& _predicate,
45+
const std::optional<DistanceType>& _distance_type,
46+
const std::map<std::string, std::string>& _options)
4047
: field_name(_field_name),
4148
limit(_limit),
4249
query(_query),
4350
pre_filter(_pre_filter),
44-
predicate(_predicate) {}
51+
predicate(_predicate),
52+
distance_type(_distance_type),
53+
options(_options) {}
4554

4655
std::shared_ptr<VectorSearch> ReplacePreFilter(PreFilter _pre_filter) const {
47-
return std::make_shared<VectorSearch>(field_name, limit, query, _pre_filter, predicate);
56+
return std::make_shared<VectorSearch>(field_name, limit, query, _pre_filter, predicate,
57+
distance_type, options);
4858
}
4959

5060
/// Search field name.
@@ -65,5 +75,13 @@ struct PAIMON_EXPORT VectorSearch {
6575
/// @note All fields referenced in the predicate must have been materialized
6676
/// in the index during build to ensure availability.
6777
std::shared_ptr<Predicate> predicate;
78+
/// The distance metric to use for this query, if explicitly specified.
79+
/// If set, this value must match the distance type used by the index (e.g., EUCLIDEAN, COSINE).
80+
/// A mismatch will result in an error during query execution.
81+
/// If not set (std::nullopt), the query will use the distance type configured in the index.
82+
std::optional<DistanceType> distance_type;
83+
/// A key-value map of query-specific runtime options.
84+
/// Such as the size of candidate list in approximate search or parallelism for this query.
85+
std::map<std::string, std::string> options;
6886
};
6987
} // namespace paimon

include/paimon/read_context.h

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ namespace paimon {
3232
class Executor;
3333
class MemoryPool;
3434
class Predicate;
35+
class FileSystem;
3536

3637
/// `ReadContext` is some configuration for read operations.
3738
///
@@ -48,6 +49,7 @@ class PAIMON_EXPORT ReadContext {
4849
uint32_t row_to_batch_thread_number, const std::optional<std::string>& table_schema,
4950
const std::shared_ptr<MemoryPool>& memory_pool,
5051
const std::shared_ptr<Executor>& executor,
52+
const std::shared_ptr<FileSystem>& specific_file_system,
5153
const std::map<std::string, std::string>& fs_scheme_to_identifier_map,
5254
const std::map<std::string, std::string>& options);
5355
~ReadContext();
@@ -103,6 +105,9 @@ class PAIMON_EXPORT ReadContext {
103105
std::shared_ptr<Executor> GetExecutor() const {
104106
return executor_;
105107
}
108+
std::shared_ptr<FileSystem> GetSpecificFileSystem() const {
109+
return specific_file_system_;
110+
}
106111

107112
private:
108113
std::string path_;
@@ -118,6 +123,7 @@ class PAIMON_EXPORT ReadContext {
118123
std::optional<std::string> table_schema_;
119124
std::shared_ptr<MemoryPool> memory_pool_;
120125
std::shared_ptr<Executor> executor_;
126+
std::shared_ptr<FileSystem> specific_file_system_;
121127
std::map<std::string, std::string> fs_scheme_to_identifier_map_;
122128
std::map<std::string, std::string> options_;
123129
};
@@ -258,15 +264,29 @@ class PAIMON_EXPORT ReadContextBuilder {
258264
/// @note Default branch is "main" if not specified.
259265
ReadContextBuilder& WithBranch(const std::string& branch);
260266

261-
/// Set the file system scheme to identifier mapping for custom file system configurations.
262-
/// This allows using different file system implementations for different URI schemes.
267+
/// Sets a mapping from URI schemes (e.g., "file", "oss") to registered file system
268+
/// identifiers. This allows selecting different pre-registered file system implementations
269+
/// based on the URI scheme at runtime.
263270
///
264-
/// @param fs_scheme_to_identifier_map Map from URI scheme to file system identifier.
271+
/// @param fs_scheme_to_identifier_map Map from URI scheme (like "oss") to the corresponding
272+
/// file system identifier.
265273
/// @return Reference to this builder for method chaining.
266-
/// @note If not set, use default file system (configured in `Options::FILE_SYSTEM`).
274+
/// @note
275+
/// - This method is intended for environments where multiple file systems are pre-registered.
276+
/// - The specified identifiers must correspond to file systems that have been registered at
277+
/// compile time or initialization.
278+
/// - Cannot be used together with `WithFileSystem()`.
279+
/// - If not set, use default file system (configured in `Options::FILE_SYSTEM`).
280+
/// Example:
281+
/// builder.WithFileSystemSchemeToIdentifierMap({{"oss", "jindo"}, {"file", "local"}});
282+
///
267283
ReadContextBuilder& WithFileSystemSchemeToIdentifierMap(
268284
const std::map<std::string, std::string>& fs_scheme_to_identifier_map);
269285

286+
/// Sets a custom file system instance to be used for all file operations in this read context.
287+
/// This bypasses the global file system registry and uses the provided implementation directly.
288+
ReadContextBuilder& WithFileSystem(const std::shared_ptr<FileSystem>& file_system);
289+
270290
/// Build and return a `ReadContext` instance with input validation.
271291
/// @return Result containing the constructed `ReadContext` or an error status.
272292
Result<std::unique_ptr<ReadContext>> Finish();

src/paimon/common/global_index/bitmap/bitmap_global_index_test.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,8 @@ TEST_F(BitmapGlobalIndexTest, TestStringType) {
219219

220220
// test visit vector search
221221
ASSERT_NOK_WITH_MSG(reader->VisitVectorSearch(std::make_shared<VectorSearch>(
222-
"f0", 10, std::vector<float>({1.0f, 2.0f}), nullptr, nullptr)),
222+
"f0", 10, std::vector<float>({1.0f, 2.0f}), nullptr, nullptr,
223+
std::nullopt, std::map<std::string, std::string>())),
223224
"FileIndexReaderWrapper is not supposed to handle vector search query");
224225
};
225226

src/paimon/core/operation/internal_read_context.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ Result<std::unique_ptr<InternalReadContext>> InternalReadContext::Create(
3434
const std::map<std::string, std::string>& options) {
3535
PAIMON_ASSIGN_OR_RAISE(
3636
CoreOptions core_options,
37-
CoreOptions::FromMap(options, context->GetFileSystemSchemeToIdentifierMap()));
37+
CoreOptions::FromMap(options, context->GetFileSystemSchemeToIdentifierMap(),
38+
context->GetSpecificFileSystem()));
3839
// prepare read schema
3940
std::vector<DataField> read_data_fields;
4041
read_data_fields.reserve(context->GetReadSchema().size());

src/paimon/core/operation/read_context.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ ReadContext::ReadContext(const std::string& path, const std::string& branch,
3636
const std::optional<std::string>& table_schema,
3737
const std::shared_ptr<MemoryPool>& memory_pool,
3838
const std::shared_ptr<Executor>& executor,
39+
const std::shared_ptr<FileSystem>& specific_file_system,
3940
const std::map<std::string, std::string>& fs_scheme_to_identifier_map,
4041
const std::map<std::string, std::string>& options)
4142
: path_(path),
@@ -51,6 +52,7 @@ ReadContext::ReadContext(const std::string& path, const std::string& branch,
5152
table_schema_(table_schema),
5253
memory_pool_(memory_pool),
5354
executor_(executor),
55+
specific_file_system_(specific_file_system),
5456
fs_scheme_to_identifier_map_(fs_scheme_to_identifier_map),
5557
options_(options) {}
5658

@@ -74,6 +76,7 @@ class ReadContextBuilder::Impl {
7476
table_schema_ = std::nullopt;
7577
memory_pool_ = GetDefaultPool();
7678
executor_.reset();
79+
specific_file_system_.reset();
7780
}
7881

7982
private:
@@ -92,6 +95,7 @@ class ReadContextBuilder::Impl {
9295
std::optional<std::string> table_schema_;
9396
std::shared_ptr<MemoryPool> memory_pool_ = GetDefaultPool();
9497
std::shared_ptr<Executor> executor_;
98+
std::shared_ptr<FileSystem> specific_file_system_;
9599
};
96100

97101
ReadContextBuilder::ReadContextBuilder(const std::string& path)
@@ -180,6 +184,12 @@ ReadContextBuilder& ReadContextBuilder::WithFileSystemSchemeToIdentifierMap(
180184
return *this;
181185
}
182186

187+
ReadContextBuilder& ReadContextBuilder::WithFileSystem(
188+
const std::shared_ptr<FileSystem>& file_system) {
189+
impl_->specific_file_system_ = file_system;
190+
return *this;
191+
}
192+
183193
Result<std::unique_ptr<ReadContext>> ReadContextBuilder::Finish() {
184194
PAIMON_ASSIGN_OR_RAISE(impl_->path_, PathUtil::NormalizePath(impl_->path_));
185195
if (impl_->path_.empty()) {
@@ -207,7 +217,8 @@ Result<std::unique_ptr<ReadContext>> ReadContextBuilder::Finish() {
207217
impl_->enable_predicate_filter_, impl_->enable_prefetch_, impl_->prefetch_batch_count_,
208218
impl_->prefetch_max_parallel_num_, impl_->enable_multi_thread_row_to_batch_,
209219
impl_->row_to_batch_thread_number_, impl_->table_schema_, impl_->memory_pool_,
210-
impl_->executor_, impl_->fs_scheme_to_identifier_map_, impl_->options_);
220+
impl_->executor_, impl_->specific_file_system_, impl_->fs_scheme_to_identifier_map_,
221+
impl_->options_);
211222
impl_->Reset();
212223
return ctx;
213224
}

src/paimon/core/operation/read_context_test.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "paimon/defs.h"
2323
#include "paimon/predicate/predicate_builder.h"
2424
#include "paimon/status.h"
25+
#include "paimon/testing/mock/mock_file_system.h"
2526
#include "paimon/testing/utils/testharness.h"
2627

2728
namespace paimon::test {
@@ -42,6 +43,7 @@ TEST(ReadContextTest, TestSimple) {
4243
ASSERT_EQ(1, ctx->GetRowToBatchThreadNumber());
4344
ASSERT_EQ("main", ctx->GetBranch());
4445
ASSERT_TRUE(ctx->GetFileSystemSchemeToIdentifierMap().empty());
46+
ASSERT_FALSE(ctx->GetSpecificFileSystem());
4547
}
4648

4749
TEST(ReadContextTest, TestSetContent) {
@@ -59,6 +61,8 @@ TEST(ReadContextTest, TestSetContent) {
5961
builder.SetRowToBatchThreadNumber(9);
6062
builder.WithBranch("rt");
6163
builder.WithFileSystemSchemeToIdentifierMap({{"file", "local"}});
64+
auto fs = std::make_shared<MockFileSystem>();
65+
builder.WithFileSystem(fs);
6266
ASSERT_OK_AND_ASSIGN(auto ctx, builder.Finish());
6367

6468
// test result
@@ -78,6 +82,7 @@ TEST(ReadContextTest, TestSetContent) {
7882
ASSERT_EQ(expected_fs_map, ctx->GetFileSystemSchemeToIdentifierMap());
7983
std::map<std::string, std::string> expected_options = {{"key", "value"}};
8084
ASSERT_EQ(expected_options, ctx->GetOptions());
85+
ASSERT_EQ(ctx->GetSpecificFileSystem(), fs);
8186
}
8287

8388
} // namespace paimon::test

src/paimon/core/operation/scan_context_test.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ TEST(ScanContextTest, TestSetFilter) {
5050
builder.SetPredicate(predicate);
5151
std::vector<float> query = {1.0, 2.0};
5252
VectorSearch::PreFilter pre_filter = [](int64_t id) -> bool { return id % 2; };
53-
builder.SetVectorSearch(std::make_shared<VectorSearch>("f0", 10, query, pre_filter, nullptr));
53+
builder.SetVectorSearch(std::make_shared<VectorSearch>(
54+
"f0", 10, query, pre_filter, nullptr, VectorSearch::DistanceType::INNER_PRODUCT,
55+
std::map<std::string, std::string>()));
5456
std::vector<Range> row_ranges = {Range(1, 2), Range(4, 5)};
5557
auto global_index_result = BitmapGlobalIndexResult::FromRanges(row_ranges);
5658
builder.SetGlobalIndexResult(global_index_result);
@@ -67,6 +69,8 @@ TEST(ScanContextTest, TestSetFilter) {
6769
auto result_vector_search = ctx->GetScanFilters()->GetVectorSearch();
6870
ASSERT_TRUE(result_vector_search);
6971
ASSERT_EQ(query, result_vector_search->query);
72+
ASSERT_EQ(VectorSearch::DistanceType::INNER_PRODUCT,
73+
result_vector_search->distance_type.value());
7074
ASSERT_EQ(partition_filters, ctx->GetScanFilters()->GetPartitionFilters());
7175
ASSERT_EQ("{1,2,4,5}", ctx->GetGlobalIndexResult()->ToString());
7276
std::map<std::string, std::string> expected_options = {{"key", "value"}};

src/paimon/core/table/source/table_read.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ Result<std::unique_ptr<InternalReadContext>> CreateInternalReadContext(
5757
} else {
5858
PAIMON_ASSIGN_OR_RAISE(
5959
CoreOptions tmp_core_options,
60-
CoreOptions::FromMap(tmp_options, context->GetFileSystemSchemeToIdentifierMap()));
60+
CoreOptions::FromMap(tmp_options, context->GetFileSystemSchemeToIdentifierMap(),
61+
context->GetSpecificFileSystem()));
6162
SchemaManager schema_manager(tmp_core_options.GetFileSystem(), context->GetPath(), branch);
6263
PAIMON_ASSIGN_OR_RAISE(std::optional<std::shared_ptr<TableSchema>> latest_schema,
6364
schema_manager.Latest());

0 commit comments

Comments
 (0)