Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rust/lance-core/src/utils/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,10 @@ impl RowAddrTreeMap {

Ok(Self { inner })
}

pub fn fragments(&self) -> Vec<u32> {
self.inner.keys().cloned().collect()
}
}

impl std::ops::BitOr<Self> for RowAddrTreeMap {
Expand Down
16 changes: 16 additions & 0 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1832,6 +1832,22 @@ impl Dataset {
.collect()
}

/// Prunes dataset fragments using scalar indices for the given filter expression.
///
/// This returns the subset of manifest fragments that still need to be scanned,
/// in manifest order. Fragments not covered by the participating scalar indices
/// are always retained, and if the filter does not yield a scalar index query
/// (or the index result cannot safely exclude fragments), this method is effectively
/// a no-op and returns all fragments.
pub async fn prune_fragments(&self, filter: &str) -> Result<Vec<Fragment>> {
Scanner::scalar_indexed_prune_fragments(
Arc::new(self.clone()),
filter,
self.manifest.fragments.clone(),
)
.await
}

pub fn get_fragment(&self, fragment_id: usize) -> Option<FileFragment> {
let dataset = Arc::new(self.clone());
let fragment = self
Expand Down
108 changes: 108 additions & 0 deletions rust/lance/src/dataset/scanner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashSet;
use std::ops::Range;
use std::pin::Pin;
use std::sync::{Arc, LazyLock};
Expand Down Expand Up @@ -3970,6 +3971,113 @@ impl Scanner {

Ok(format!("{}", display.indent(verbose)))
}

/// Prune a list of fragments using scalar indices for a given filter.
///
/// This helper builds a temporary [`Scanner`] over `dataset`, plans `filter`
/// with scalar index support and, when possible, evaluates the scalar index
/// expression to determine which fragments contain any candidate rows.
///
/// Returns the subset of `fragments` that still need to be scanned:
/// fragments that have at least one candidate row according to the index
/// result, plus any fragments that are not fully covered by all indices.
/// Fragments outside the index coverage are never dropped.
///
/// # Notes
///
/// - Inputs:
/// - `dataset`: logical [`Dataset`] used to plan and evaluate the scalar index
/// expression.
/// - `filter`: SQL-like predicate string used for planning; it is not evaluated
/// as a full scan in this helper.
/// - `fragments`: manifest [`Fragment`]s considered for pruning.
/// - Pruning is driven only by scalar index results with *exact* or *at-most*
/// semantics. Results with *at-least* semantics cannot safely exclude any
/// fragment, so all `fragments` are returned unchanged in that case.
/// - When the index result is an allow-list [`RowAddrMask`], fragments that are
/// fully covered by the participating indices and have no allowed rows in the
/// mask are pruned. This never drops fragments that might still satisfy
/// `filter`.
/// - When the index result is a block-list [`RowAddrMask`], only fragments that
/// are both fully covered by the indices and fully blocked in the mask can be
/// pruned. Partially blocked fragments, and all fragments not covered by every
/// index, are always kept to avoid false negatives.
/// - This helper only performs scalar index planning and evaluation; it does not
/// build or execute a full scan plan.
pub async fn scalar_indexed_prune_fragments(
dataset: Arc<Dataset>,
filter: &str,
fragments: Arc<Vec<Fragment>>,
) -> Result<Vec<Fragment>> {
let mut scanner = Self::new(dataset.clone());

scanner.filter(filter)?;
let filter_plan = scanner.create_filter_plan(true).await?;

if let Some(index_expr) = filter_plan.expr_filter_plan.index_query.as_ref() {
// Partition fragments into those fully covered by all scalar indices and
// those that are not.
let (covered_frags, missing_frags) = scanner
.partition_frags_by_coverage(index_expr, fragments.clone())
.await?;

// Evaluate the scalar index expression to obtain a row-address mask
// over the covered fragments.
let expr_result = index_expr
.evaluate(dataset.as_ref(), &NoOpMetricsCollector)
.await?;

match expr_result {
IndexExprResult::Exact(mask) | IndexExprResult::AtMost(mask) => match mask {
RowAddrMask::AllowList(map) => {
let allow_fragids: HashSet<u32> = map.fragments().into_iter().collect();

// Among fully covered fragments, keep only those with at least one allowed row.
let mut allow_frags: Vec<Fragment> = covered_frags
.clone()
.iter()
.filter(|f| allow_fragids.contains(&(f.id as u32)))
.cloned()
.collect();

// Always keep fragments that are not fully covered by the indices.
allow_frags.extend(missing_frags);
Ok(allow_frags)
}

RowAddrMask::BlockList(map) => {
if map.is_empty() {
// No fragment is blocked by the mask; nothing can be pruned.
Ok(fragments.to_vec())
} else {
let blocked_fragids: HashSet<u32> =
map.fragments().into_iter().collect();

// Fragments that are not blocked at all or only partially blocked still
// need to be scanned.
let mut allow_frags: Vec<Fragment> = covered_frags
.clone()
.iter()
.filter(|f| {
!blocked_fragids.contains(&(f.id as u32))
|| map.get_fragment_bitmap(f.id as u32).is_some()
})
.cloned()
.collect();

// Always keep fragments that are not fully covered by the indices.
allow_frags.extend(missing_frags);
Ok(allow_frags)
}
}
},

IndexExprResult::AtLeast(_) => Ok(fragments.to_vec()),
}
} else {
Ok(fragments.to_vec())
}
}
}

// Search over all indexed fields including nested ones, collecting columns that have an
Expand Down
Loading