Skip to content

Conversation

@Jay-ju
Copy link
Contributor

@Jay-ju Jay-ju commented Sep 16, 2025

perf(execution): optimize fragment scheduling with pre-filtering

Reduce empty task overhead in distributed engines (Ray/Daft/Spark):

  • Implement fragment-level filtering in get_fragment interface
  • Skip scheduling fragments with zero matching rows

@github-actions github-actions bot added enhancement New feature or request python labels Sep 16, 2025
@Jay-ju Jay-ju force-pushed the get-fragments-filter-support branch 2 times, most recently from c1cc9cc to 018744e Compare September 16, 2025 05:52
@Jay-ju Jay-ju force-pushed the get-fragments-filter-support branch from 018744e to 8e80d49 Compare September 16, 2025 09:21
- Use fragment.scanner(filter=filter) to get filtered data from
- individual fragments
"""
# Get all fragments first (same as original implementation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand, this means you are not really improving from perf perspective? And it would be quite expensive to check every fragment against the filter. We have added zone map now, I think we should select fragments (zones even better) to distribute based on the zone map index.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackye1995 Actually, this is a tradeoff. When the number of fragments is relatively large, it will bring some problems. For example, when the number of fragments in a dataset exceeds 20,000, and because the current dataset caches the manifest by default, this causes significant consumption each time physical plan tasks are distributed.

Therefore, it is hoped here to first filter the valid fragments, meanwhile the filter here can still be left un filled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I am all for adding this filter. To be more specific, I am saying:

filtered_fragments = [
                fragment
                for fragment in all_fragments
                if fragment.count_rows(filter) > 0
            ]

seems expensive since you are running a count_rows per fragment.

I think we can expose a corse-grained plan_fragments API that plans the fragments to distribute based on a filter, and we can apply that filter once against zone map or bloom filter index.

Copy link
Contributor Author

@Jay-ju Jay-ju Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jackye1995 What you mean here is that after the dataset creates a zonemap index, dataset.scanner can use the index. But fragment.scanner cannot use the index? So we need to make fragment also use the index? Is my understanding correct?

Copy link
Contributor Author

@Jay-ju Jay-ju Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fragment.scanner should also be able to use ScalarIndexQuery, but after my test, it's true that the time taken by fragment's count_rows is much worse than that of dataset's count_rows. I guess it should be due to multi-threaded operations.
image

Copy link
Contributor

@jackye1995 jackye1995 Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to align the goal here first. In my understanding, we want to use get_fragments with a filter because of use cases like https://github.com/lancedb/lance-ray/blame/main/lance_ray/io.py#L265.

So what happens is that you (1) get the list of fragments to distribute, and then (2) process each fragment in worker.

The step 1 here you don't necessarily need an exact match, you just want to prune down the list of fragments that might match the filter. In many cases this can already discard a large portion of the fragments. And in the actual worker, you can do the actual filtering.

That is why I think using inexact indexes like zone_map index and bloom filter index is ideal here, because we would be able to very quickly answer the question of "what fragments might match the filter" and then distribute the work. Then in each worker, you can pushdown the filter and do the exact scan filtering there. If there are exact indexes like fbtree or bitmap we can still use them, just they could be slower but still okay.
If there is no index exist against the filtered columns, we can just return the full list of fragments which means all fragments might match the filter.

Compared to that, what is done in the current code is very heavy weight. You are doing an exact count which actually triggers a scan for each fragment. You are also doing that for each fragment separately which is even worse. I don't think that really achieves the goal of "reduce empty task overhead in distributed engines (Ray/Daft/Spark)" because you are doing much more compute in the coordinator.

I guess maybe from implementation perspective, get_fragments should do what you have implemented. But for the purpose of reduce empty task overhead, we might want to create another method like prune_fragments.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the goal can be agreed upon. My scenario is indeed that the driver uses a coarse-grained index, and the worker specifically executes the filtering. However, after looking at the current implementation of this part of the index, there are some minor issues that may need to be clarified first:

This means that if we want to filter a specific index for use, we need to add an interface similar to

index_selection = {
    "column_hints": {
        "col1": ["btree"],
        "col2": ["bloom_filter", "btree"]
    },
}

scanner = dataset.scanner(index_selection=index_selection)
  • Is it necessary to support multiple indexes for the one column here?

  • Suppose that the index of this column has been obtained, and then after applying the filter to the index, a RowIdTreeMap is obtained. Then, I wonder if it is necessary to add an approx_count_rows() interface to count the number of row_id in the RowIdTreeMap. Additionally, can this interface also be exposed in the fragment interface?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah today in general there is an implicit assumption of 1 index per column. With these inexact indexes, this is no longer true since we might want to have a zone-map or bloom filter for many columns, and then have exact indexes to use at worker. I created #4805 listing a few work we need to do.

For here, I think we can first assume 1 index per column for now so we are not blocked by it.

Copy link
Contributor

@fangbo fangbo Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to align the goal here first. In my understanding, we want to use get_fragments with a filter because of use cases like https://github.com/lancedb/lance-ray/blame/main/lance_ray/io.py#L265.

So what happens is that you (1) get the list of fragments to distribute, and then (2) process each fragment in worker.

The step 1 here you don't necessarily need an exact match, you just want to prune down the list of fragments that might match the filter. In many cases this can already discard a large portion of the fragments. And in the actual worker, you can do the actual filtering.

That is why I think using inexact indexes like zone_map index and bloom filter index is ideal here, because we would be able to very quickly answer the question of "what fragments might match the filter" and then distribute the work. Then in each worker, you can pushdown the filter and do the exact scan filtering there. If there are exact indexes like fbtree or bitmap we can still use them, just they could be slower but still okay. If there is no index exist against the filtered columns, we can just return the full list of fragments which means all fragments might match the filter.

Compared to that, what is done in the current code is very heavy weight. You are doing an exact count which actually triggers a scan for each fragment. You are also doing that for each fragment separately which is even worse. I don't think that really achieves the goal of "reduce empty task overhead in distributed engines (Ray/Daft/Spark)" because you are doing much more compute in the coordinator.

I guess maybe from implementation perspective, get_fragments should do what you have implemented. But for the purpose of reduce empty task overhead, we might want to create another method like prune_fragments.

@jackye1995 I submitted a PR #5625 to prune fragments using filter and index. Could you please take a look and let me know if this seems reasonable? Thank you.

cc @majin1102

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants