Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/paimon/defs.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,8 @@ struct PAIMON_EXPORT Options {
/// "blob-as-descriptor" - Read and write blob field using blob descriptor rather than blob
/// bytes. Default value is "false".
static const char BLOB_AS_DESCRIPTOR[];
/// "global-index.enabled" - Whether to enable global index for scan. Default value is "true".
static const char GLOBAL_INDEX_ENABLED[];
};

static constexpr int64_t BATCH_WRITE_COMMIT_IDENTIFIER = std::numeric_limits<int64_t>::max();
Expand Down
6 changes: 3 additions & 3 deletions include/paimon/global_index/bitmap_global_index_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ class PAIMON_EXPORT BitmapGlobalIndexResult : public GlobalIndexResult {
/// bitmap is not actually required. **Not thread-safe**.
Result<const RoaringBitmap64*> GetBitmap() const;

/// @return A shared pointer to a new BitmapGlobalIndexResult instance representing the given
/// inclusive range [from, to].
static std::shared_ptr<BitmapGlobalIndexResult> FromRange(const Range& range);
/// Creates `BitmapGlobalIndexResult` for all row ids in the given ranges.
/// @note Overlapping or unsorted ranges are accepted.
static std::shared_ptr<BitmapGlobalIndexResult> FromRanges(const std::vector<Range>& ranges);

private:
mutable bool initialized_ = false;
Expand Down
4 changes: 4 additions & 0 deletions include/paimon/global_index/global_index_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "paimon/memory/bytes.h"
#include "paimon/memory/memory_pool.h"
#include "paimon/result.h"
#include "paimon/utils/range.h"
#include "paimon/visibility.h"

namespace paimon {
Expand Down Expand Up @@ -55,6 +56,9 @@ class PAIMON_EXPORT GlobalIndexResult : public std::enable_shared_from_this<Glob
/// Creates a new iterator over the selected global row ids.
virtual Result<std::unique_ptr<Iterator>> CreateIterator() const = 0;

/// Returns non-overlapping, sorted ranges covering all row ids in `GlobalIndexResult`.
Result<std::vector<Range>> ToRanges() const;

/// Computes the logical AND (intersection) between current result and another.
virtual Result<std::shared_ptr<GlobalIndexResult>> And(
const std::shared_ptr<GlobalIndexResult>& other);
Expand Down
12 changes: 11 additions & 1 deletion include/paimon/global_index/global_index_scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class PAIMON_EXPORT GlobalIndexScan {
///
/// @param table_path Root directory of the table.
/// @param snapshot_id Optional snapshot ID to read from; if not provided, uses the latest.
/// @param partitions Optional list of partition specs to restrict the scan scope.
/// @param partitions Optional list of specific partitions to restrict the scan scope.
/// Each map represents one partition (e.g., {"dt": "2024-06-01"}).
/// If omitted, scans all partitions.
/// @param options Index-specific configuration.
Expand All @@ -53,6 +53,16 @@ class PAIMON_EXPORT GlobalIndexScan {
const std::map<std::string, std::string>& options,
const std::shared_ptr<FileSystem>& file_system, const std::shared_ptr<MemoryPool>& pool);

/// Creates a `GlobalIndexScan` instance for the specified table and context.
///
/// @param partition_filters Optional specific partition predicates.
static Result<std::unique_ptr<GlobalIndexScan>> Create(
const std::string& root_path, const std::optional<int64_t>& snapshot_id,
const std::shared_ptr<Predicate>& partition_filters,
const std::map<std::string, std::string>& options,
const std::shared_ptr<FileSystem>& file_system,
const std::shared_ptr<MemoryPool>& memory_pool);

virtual ~GlobalIndexScan() = default;

/// Creates a scanner for the global index over the specified row ID range.
Expand Down
27 changes: 13 additions & 14 deletions include/paimon/scan_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,11 @@
#include <string>
#include <vector>

#include "paimon/global_index/global_index_result.h"
#include "paimon/predicate/predicate.h"
#include "paimon/result.h"
#include "paimon/type_fwd.h"
#include "paimon/utils/range.h"
#include "paimon/visibility.h"

namespace paimon {
class ScanContextBuilder;
class ScanFilter;
Expand All @@ -45,6 +44,7 @@ class PAIMON_EXPORT ScanContext {
public:
ScanContext(const std::string& path, bool is_streaming_mode, std::optional<int32_t> limit,
const std::shared_ptr<ScanFilter>& scan_filter,
const std::shared_ptr<GlobalIndexResult>& global_index_result,
const std::shared_ptr<MemoryPool>& memory_pool,
const std::shared_ptr<Executor>& executor,
const std::map<std::string, std::string>& options);
Expand Down Expand Up @@ -77,12 +77,16 @@ class PAIMON_EXPORT ScanContext {
std::shared_ptr<Executor> GetExecutor() const {
return executor_;
}
std::shared_ptr<GlobalIndexResult> GetGlobalIndexResult() const {
return global_index_result_;
}

private:
std::string path_;
bool is_streaming_mode_;
std::optional<int32_t> limit_;
std::shared_ptr<ScanFilter> scan_filters_;
std::shared_ptr<GlobalIndexResult> global_index_result_;
std::shared_ptr<MemoryPool> memory_pool_;
std::shared_ptr<Executor> executor_;
std::map<std::string, std::string> options_;
Expand All @@ -93,11 +97,10 @@ class PAIMON_EXPORT ScanFilter {
public:
ScanFilter(const std::shared_ptr<Predicate>& predicate,
const std::vector<std::map<std::string, std::string>>& partition_filters,
const std::optional<int32_t>& bucket_filter, const std::vector<Range>& row_ranges)
const std::optional<int32_t>& bucket_filter)
: predicates_(predicate),
bucket_filter_(bucket_filter),
partition_filters_(partition_filters),
row_ranges_(row_ranges) {}
partition_filters_(partition_filters) {}

std::shared_ptr<Predicate> GetPredicate() const {
return predicates_;
Expand All @@ -109,15 +112,10 @@ class PAIMON_EXPORT ScanFilter {
return partition_filters_;
}

const std::vector<Range>& GetRowRanges() const {
return row_ranges_;
}

private:
std::shared_ptr<Predicate> predicates_;
std::optional<int32_t> bucket_filter_;
std::vector<std::map<std::string, std::string>> partition_filters_;
std::vector<Range> row_ranges_;
};

/// `ScanContextBuilder` used to build a `ScanContext`, has input validation.
Expand All @@ -138,10 +136,11 @@ class PAIMON_EXPORT ScanContextBuilder {
/// Set a predicate for filtering data.
ScanContextBuilder& SetPredicate(const std::shared_ptr<Predicate>& predicate);

/// Specify the row id ranges for scan. This is usually used to read specific rows in
/// data-evolution mode. File ranges that do not have any intersection with range_ids will be
/// filtered. If not set, all rows are returned
ScanContextBuilder& SetRowRanges(const std::vector<Range>& row_ranges);
/// Sets the result of a global index search (e.g., row ids (may with scores) from a distributed
/// index lookup). This is used to push down index-filtered row ids into the scan for efficient
/// data retrieval.
ScanContextBuilder& SetGlobalIndexResult(
const std::shared_ptr<GlobalIndexResult>& global_index_result);
/// The options added or set in `ScanContextBuilder` have high priority and will be merged with
/// the options in table schema.
ScanContextBuilder& AddOption(const std::string& key, const std::string& value);
Expand Down
9 changes: 9 additions & 0 deletions include/paimon/utils/range.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,15 @@ struct PAIMON_EXPORT Range {
/// Computes the set intersection of two collections of disjoint, sorted ranges.
static std::vector<Range> And(const std::vector<Range>& left, const std::vector<Range>& right);

/// Excludes the given ranges from this range and returns the remaining ranges.
///
/// For example, if this range is [0, 10000] and ranges to exclude are [1000, 2000], [3000,
/// 4000], [5000, 6000], then the result is [0, 999], [2001, 2999], [4001, 4999], [6001, 10000].
///
/// @param ranges The ranges to exclude (can be unsorted and overlapping).
/// @return The remaining ranges after exclusion.
std::vector<Range> Exclude(const std::vector<Range>& ranges) const;

bool operator==(const Range& other) const;
bool operator<(const Range& other) const;

Expand Down
1 change: 1 addition & 0 deletions src/paimon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ set(PAIMON_CORE_SRCS
core/table/source/startup_mode.cpp
core/table/source/table_read.cpp
core/table/source/table_scan.cpp
core/table/source/data_evolution_batch_scan.cpp
core/utils/field_mapping.cpp
core/utils/fields_comparator.cpp
core/utils/file_store_path_factory.cpp
Expand Down
2 changes: 1 addition & 1 deletion src/paimon/common/defs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ const char Options::ROW_TRACKING_ENABLED[] = "row-tracking.enabled";
const char Options::DATA_EVOLUTION_ENABLED[] = "data-evolution.enabled";
const char Options::PARTITION_GENERATE_LEGACY_NAME[] = "partition.legacy-name";
const char Options::BLOB_AS_DESCRIPTOR[] = "blob-as-descriptor";

const char Options::GLOBAL_INDEX_ENABLED[] = "global-index.enabled";
} // namespace paimon
9 changes: 6 additions & 3 deletions src/paimon/common/global_index/bitmap_global_index_result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,13 @@ std::string BitmapGlobalIndexResult::ToString() const {
return bitmap.value()->ToString();
}

std::shared_ptr<BitmapGlobalIndexResult> BitmapGlobalIndexResult::FromRange(const Range& range) {
BitmapGlobalIndexResult::BitmapSupplier supplier = [range]() -> Result<RoaringBitmap64> {
std::shared_ptr<BitmapGlobalIndexResult> BitmapGlobalIndexResult::FromRanges(
const std::vector<Range>& ranges) {
BitmapGlobalIndexResult::BitmapSupplier supplier = [ranges]() -> Result<RoaringBitmap64> {
RoaringBitmap64 bitmap;
bitmap.AddRange(range.from, range.to + 1);
for (const auto& range : ranges) {
bitmap.AddRange(range.from, range.to + 1);
}
return bitmap;
};
return std::make_shared<BitmapGlobalIndexResult>(supplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,18 @@ TEST_F(BitmapGlobalIndexResultTest, TestInvalidBitmapResult) {
ASSERT_TRUE(result->ToString().find("Invalid: invalid supplier") != std::string::npos);
}

TEST_F(BitmapGlobalIndexResultTest, TestFromRange) {
TEST_F(BitmapGlobalIndexResultTest, TestFromRanges) {
{
auto result = BitmapGlobalIndexResult::FromRange(Range(0, 5));
auto result = BitmapGlobalIndexResult::FromRanges({Range(0, 5)});
ASSERT_EQ(result->ToString(), "{0,1,2,3,4,5}");
}
{
auto result = BitmapGlobalIndexResult::FromRange(Range(10, 10));
auto result = BitmapGlobalIndexResult::FromRanges({Range(10, 10)});
ASSERT_EQ(result->ToString(), "{10}");
}
{
auto result = BitmapGlobalIndexResult::FromRanges({Range(0, 5), Range(10, 10)});
ASSERT_EQ(result->ToString(), "{0,1,2,3,4,5,10}");
}
}
} // namespace paimon::test
25 changes: 25 additions & 0 deletions src/paimon/common/global_index/global_index_result.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,4 +133,29 @@ Result<std::shared_ptr<GlobalIndexResult>> GlobalIndexResult::Deserialize(
return std::make_shared<BitmapTopKGlobalIndexResult>(std::move(bitmap), std::move(scores));
}

Result<std::vector<Range>> GlobalIndexResult::ToRanges() const {
std::vector<Range> ranges;
PAIMON_ASSIGN_OR_RAISE(bool empty, IsEmpty());
if (empty) {
return ranges;
}
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<Iterator> iter, CreateIterator());
int64_t range_start = iter->Next();
int64_t range_end = range_start;
while (iter->HasNext()) {
int64_t current = iter->Next();
if (current == range_end + 1) {
// Extend the current range
range_end = current;
} else {
ranges.push_back(Range(range_start, range_end));
range_start = current;
range_end = current;
}
}
// Add the last range
ranges.push_back(Range(range_start, range_end));
return ranges;
}

} // namespace paimon
7 changes: 7 additions & 0 deletions src/paimon/common/global_index/global_index_result_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,16 @@ TEST_F(GlobalIndexResultTest, TestSimple) {
std::make_shared<FakeGlobalIndexResult>(std::vector<int64_t>({100, 5, 4, 3, 200}));
ASSERT_OK_AND_ASSIGN(auto and_result, result1->And(result2));
ASSERT_EQ(and_result->ToString(), "{3,5,100}");
ASSERT_OK_AND_ASSIGN(auto and_ranges, and_result->ToRanges());
std::vector<Range> expect_and_ranges = {Range(3, 3), Range(5, 5), Range(100, 100)};
ASSERT_EQ(and_ranges, expect_and_ranges);

ASSERT_OK_AND_ASSIGN(auto or_result, result1->Or(result2));
ASSERT_EQ(or_result->ToString(), "{1,3,4,5,100,200}");
ASSERT_OK_AND_ASSIGN(auto or_ranges, or_result->ToRanges());
std::vector<Range> expect_or_ranges = {Range(1, 1), Range(3, 5), Range(100, 100),
Range(200, 200)};
ASSERT_EQ(or_ranges, expect_or_ranges);
}

TEST_F(GlobalIndexResultTest, TestSerializeAndDeserializeSimple) {
Expand Down
37 changes: 37 additions & 0 deletions src/paimon/common/utils/range.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,43 @@ bool Range::HasIntersection(const Range& left, const Range& right) {
return intersection_start <= intersection_end;
}

std::vector<Range> Range::Exclude(const std::vector<Range>& ranges) const {
if (ranges.empty()) {
return {*this};
}

// Sort ranges by from
std::vector<Range> sorted = ranges;
std::sort(sorted.begin(), sorted.end(),
[](const Range& left, const Range& right) { return left.from < right.from; });

std::vector<Range> result;
int64_t current = from;

for (const auto& exclude : sorted) {
// Compute intersection with the current range
auto intersect = Range::Intersection(Range(current, to), exclude);
if (!intersect) {
continue;
}
// Add the part before the intersection (if any)
if (current < intersect.value().from) {
result.push_back(Range(current, intersect.value().from - 1));
}
// Move current position past the intersection
current = intersect.value().to + 1;
if (current > to) {
break;
}
}
// Add the remaining part after all exclusions (if any)
if (current <= to) {
result.push_back(Range(current, to));
}

return result;
}

bool Range::operator==(const Range& other) const {
if (this == &other) {
return true;
Expand Down
Loading
Loading