From a75f007d11aa71d2a65d707279f4cea98243a2a4 Mon Sep 17 00:00:00 2001 From: "fangbo.0511" Date: Mon, 5 Jan 2026 15:22:06 +0800 Subject: [PATCH 01/12] feat: add prune_fragments using filter and scala indices --- rust/lance-core/src/utils/mask.rs | 4 +++ rust/lance/src/dataset.rs | 9 +++++++ rust/lance/src/dataset/scanner.rs | 43 +++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+) diff --git a/rust/lance-core/src/utils/mask.rs b/rust/lance-core/src/utils/mask.rs index 7258a9b3bcd..d7375cf6de6 100644 --- a/rust/lance-core/src/utils/mask.rs +++ b/rust/lance-core/src/utils/mask.rs @@ -621,6 +621,10 @@ impl RowAddrTreeMap { Ok(Self { inner }) } + + pub fn fragments(&self) -> Vec { + self.inner.keys().cloned().collect() + } } impl std::ops::BitOr for RowAddrTreeMap { diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index c6c9533ea44..d4c8ea9cd2b 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1832,6 +1832,15 @@ impl Dataset { .collect() } + pub async fn prune_fragments(&self, filter: &str) -> Result { + let frag_ids = Scanner::scalar_indexed_prune_fragments( + Arc::new(self.clone()), + filter, + self.manifest.fragments.clone(), + ).await?; + Ok(frag_ids) + } + pub fn get_fragment(&self, fragment_id: usize) -> Option { let dataset = Arc::new(self.clone()); let fragment = self diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index a24b0032bf6..1b974692125 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::collections::HashSet; use std::ops::Range; use std::pin::Pin; use std::sync::{Arc, LazyLock}; @@ -3970,6 +3971,48 @@ impl Scanner { Ok(format!("{}", display.indent(verbose))) } + + pub async fn scalar_indexed_prune_fragments( + dataset: Arc, + filter: &str, + fragments: Arc>) -> Result> { + let mut scanner = Scanner::new(dataset.clone()).filter(filter)?; + + let filter_plan = scanner.create_filter_plan(true).await?; + + if let Some(index_expr) = filter_plan.expr_filter_plan.index_query.as_ref() { + // Figure out which fragments are covered by ALL indices + let (_, missing_frags) = scanner + .partition_frags_by_coverage(index_expr, fragments) + .await?; + + // Evaluate indices to find out which fragments satisfied + let expr_result = index_expr.evaluate(dataset.as_ref(), &NoOpMetricsCollector).await?; + + match expr_result { + IndexExprResult::Exact(mask) | IndexExprResult::AtMost(mask) => { + match mask { + RowAddrMask::AllowList(map) => { + let allow_fragids : HashSet = map.fragments().into_iter().collect(); + let mut allow_frags : Vec = fragments + .iter() + .filter(|f| allow_fragids.contains(*f.id)) + .cloned() + .collect(); + + allow_frags.extend(missing_frags); + Ok(allow_frags) + } + RowAddrMask::BlockList(_) => Ok(fragments.to_vec()) + } + } + + IndexExprResult::AtLeast(_) => Ok(fragments.to_vec()) + } + } else { + Ok(fragments.to_vec()) + } + } } // Search over all indexed fields including nested ones, collecting columns that have an From 803e6b79180d6597ec490f7d49fc3afb7916e47f Mon Sep 17 00:00:00 2001 From: "fangbo.0511" Date: Mon, 5 Jan 2026 15:31:49 +0800 Subject: [PATCH 02/12] minor update --- rust/lance/src/dataset.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index d4c8ea9cd2b..3bcc6c06b3b 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1832,13 +1832,12 @@ impl Dataset { .collect() } - pub async fn prune_fragments(&self, filter: &str) -> Result { - let frag_ids = Scanner::scalar_indexed_prune_fragments( + pub async fn prune_fragments(&self, filter: &str) -> Result> { + Scanner::scalar_indexed_prune_fragments( Arc::new(self.clone()), filter, self.manifest.fragments.clone(), - ).await?; - Ok(frag_ids) + ).await } pub fn get_fragment(&self, fragment_id: usize) -> Option { From 0a2d269a91aa38e3dd4f3b5f9d81a3ccd09d63b0 Mon Sep 17 00:00:00 2001 From: "fangbo.0511" Date: Mon, 5 Jan 2026 17:09:10 +0800 Subject: [PATCH 03/12] minor update --- rust/lance/src/dataset/scanner.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 1b974692125..2270048db1c 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -3972,6 +3972,24 @@ impl Scanner { Ok(format!("{}", display.indent(verbose))) } + /// Prune a list of fragments using scalar indices for a given filter. + /// + /// This helper builds a temporary [`Scanner`] over `dataset`, plans `filter` + /// with scalar index support and, when possible, evaluates the scalar index + /// expression to determine which fragments contain any candidate rows. + /// + /// Returns the subset of `fragments` that still need to be scanned: + /// fragments that have at least one candidate row according to the index + /// result, plus any fragments that are not fully covered by all indices. + /// Fragments outside the index coverage are never dropped. + /// + /// Pruning only happens when the filter plan contains a scalar index query + /// whose evaluation yields an allow-list [`RowAddrMask`] with *exact* or + /// *at-most* semantics. In that case, fragments that are fully covered by + /// the indices and have no allowed rows are pruned. If there is no index + /// query, the result has *at-least* semantics, or the index result is + /// represented as a block-list, all input `fragments` are returned + /// unchanged. pub async fn scalar_indexed_prune_fragments( dataset: Arc, filter: &str, From d91c6d8445558e3973544930afaf978145b696e7 Mon Sep 17 00:00:00 2001 From: "fangbo.0511" Date: Mon, 5 Jan 2026 20:16:29 +0800 Subject: [PATCH 04/12] add test cases --- rust/lance/src/dataset.rs | 3 +- rust/lance/src/dataset/scanner.rs | 42 +++++---- rust/lance/src/dataset/tests/dataset_index.rs | 92 +++++++++++++++++++ 3 files changed, 116 insertions(+), 21 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 3bcc6c06b3b..d227d60cfde 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1837,7 +1837,8 @@ impl Dataset { Arc::new(self.clone()), filter, self.manifest.fragments.clone(), - ).await + ) + .await } pub fn get_fragment(&self, fragment_id: usize) -> Option { diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 2270048db1c..d9d4a98c877 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -3993,39 +3993,41 @@ impl Scanner { pub async fn scalar_indexed_prune_fragments( dataset: Arc, filter: &str, - fragments: Arc>) -> Result> { - let mut scanner = Scanner::new(dataset.clone()).filter(filter)?; + fragments: Arc>, + ) -> Result> { + let mut scanner = Self::new(dataset.clone()); + scanner.filter(filter)?; let filter_plan = scanner.create_filter_plan(true).await?; if let Some(index_expr) = filter_plan.expr_filter_plan.index_query.as_ref() { // Figure out which fragments are covered by ALL indices let (_, missing_frags) = scanner - .partition_frags_by_coverage(index_expr, fragments) + .partition_frags_by_coverage(index_expr, fragments.clone()) .await?; // Evaluate indices to find out which fragments satisfied - let expr_result = index_expr.evaluate(dataset.as_ref(), &NoOpMetricsCollector).await?; + let expr_result = index_expr + .evaluate(dataset.as_ref(), &NoOpMetricsCollector) + .await?; match expr_result { - IndexExprResult::Exact(mask) | IndexExprResult::AtMost(mask) => { - match mask { - RowAddrMask::AllowList(map) => { - let allow_fragids : HashSet = map.fragments().into_iter().collect(); - let mut allow_frags : Vec = fragments - .iter() - .filter(|f| allow_fragids.contains(*f.id)) - .cloned() - .collect(); - - allow_frags.extend(missing_frags); - Ok(allow_frags) - } - RowAddrMask::BlockList(_) => Ok(fragments.to_vec()) + IndexExprResult::Exact(mask) | IndexExprResult::AtMost(mask) => match mask { + RowAddrMask::AllowList(map) => { + let allow_fragids: HashSet = map.fragments().into_iter().collect(); + let mut allow_frags: Vec = fragments.clone() + .iter() + .filter(|f| allow_fragids.contains(&(f.id as u32))) + .cloned() + .collect(); + + allow_frags.extend(missing_frags); + Ok(allow_frags) } - } + RowAddrMask::BlockList(_) => Ok(fragments.to_vec()), + }, - IndexExprResult::AtLeast(_) => Ok(fragments.to_vec()) + IndexExprResult::AtLeast(_) => Ok(fragments.to_vec()), } } else { Ok(fragments.to_vec()) diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index ce8254cbd52..daafebec5db 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -5,10 +5,12 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::vec; +use crate::dataset::scanner::test_dataset::TestVectorDataset; use crate::dataset::tests::dataset_migrations::scan_dataset; use crate::dataset::tests::dataset_transactions::{assert_results, execute_sql}; use crate::dataset::ROW_ID; use crate::index::vector::VectorIndexParams; +use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; use crate::{Dataset, Error, Result}; use lance_arrow::FixedSizeListArrayExt; @@ -2464,3 +2466,93 @@ async fn test_auto_infer_lance_tokenizer() { .unwrap(); assert_eq!(1, batch.num_rows()); } + + +#[tokio::test] +async fn test_prune_fragments_without_scalar_index_returns_all() { + // Build a small dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. + let dataset = gen_batch() + .col("i", array::step::()) + .into_ram_dataset(FragmentCount::from(5), FragmentRowCount::from(10)) + .await + .unwrap(); + + let original_fragments = dataset.fragments().clone(); + + // Without a scalar index, pruning should be a no-op and return all fragments. + let pruned = dataset.prune_fragments("i >= 30").await.unwrap(); + + assert_eq!(pruned.len(), original_fragments.len()); + let original_ids: Vec = original_fragments.iter().map(|f| f.id).collect(); + let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); + assert_eq!(pruned_ids, original_ids); +} + +#[tokio::test] +async fn test_prune_fragments_with_scalar_index_prunes_non_matching_fragments() { + // Dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. + let mut dataset = gen_batch() + .col("i", array::step::()) + .into_ram_dataset(FragmentCount::from(5), FragmentRowCount::from(10)) + .await + .unwrap(); + + // Create a scalar index on i so all current fragments are indexed. + dataset + .create_index( + &["i"], + IndexType::Scalar, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let fragments = dataset.fragments().clone(); + assert!(fragments.len() >= 3); + + // For filter i >= 30, all matching rows live in the last two fragments. + let expected_tail_ids: Vec = fragments[fragments.len() - 2..] + .iter() + .map(|f| f.id) + .collect(); + + let pruned = dataset.prune_fragments("i >= 30").await.unwrap(); + let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); + + assert_eq!(pruned_ids, expected_tail_ids); +} + +#[tokio::test] +async fn test_prune_fragments_keeps_fragments_outside_index_coverage() { + // Use TestVectorDataset to get a dataset with an Int32 column "i". + let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) + .await + .unwrap(); + + // Build a scalar index on i covering the initial fragments. + test_ds.make_scalar_index().await.unwrap(); + + let before_fragments = test_ds.dataset.fragments().clone(); + let before_ids: HashSet = before_fragments.iter().map(|f| f.id).collect(); + + // Append new data so the new fragment is not covered by the existing index. + test_ds.append_new_data().await.unwrap(); + let all_fragments = test_ds.dataset.fragments().clone(); + let new_fragments: Vec<_> = all_fragments + .iter() + .filter(|f| !before_ids.contains(&f.id)) + .collect(); + + // Sanity check: we expect exactly one newly appended fragment. + assert_eq!(new_fragments.len(), 1); + let new_fragment_id = new_fragments[0].id; + + // Use a filter that only matches early rows, which live in the original fragments. + let pruned = test_ds.dataset.prune_fragments("i < 10").await.unwrap(); + let pruned_ids: HashSet = pruned.iter().map(|f| f.id).collect(); + + // Fragments without index coverage must not be pruned. + assert!(pruned_ids.contains(&new_fragment_id)); +} From d8c6d1d82809b004c32f9f93594a6d11b06c9a4f Mon Sep 17 00:00:00 2001 From: "fangbo.0511" Date: Mon, 5 Jan 2026 20:19:20 +0800 Subject: [PATCH 05/12] format code --- rust/lance/src/dataset/scanner.rs | 3 ++- rust/lance/src/dataset/tests/dataset_index.rs | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index d9d4a98c877..f3b31d72a6e 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -4015,7 +4015,8 @@ impl Scanner { IndexExprResult::Exact(mask) | IndexExprResult::AtMost(mask) => match mask { RowAddrMask::AllowList(map) => { let allow_fragids: HashSet = map.fragments().into_iter().collect(); - let mut allow_frags: Vec = fragments.clone() + let mut allow_frags: Vec = fragments + .clone() .iter() .filter(|f| allow_fragids.contains(&(f.id as u32))) .cloned() diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index daafebec5db..6d82ce74890 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -2467,7 +2467,6 @@ async fn test_auto_infer_lance_tokenizer() { assert_eq!(1, batch.num_rows()); } - #[tokio::test] async fn test_prune_fragments_without_scalar_index_returns_all() { // Build a small dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. From 1cf144664a5abe54bff97ae0edf4aef39dc1de1b Mon Sep 17 00:00:00 2001 From: "fangbo.0511" Date: Tue, 6 Jan 2026 21:10:43 +0800 Subject: [PATCH 06/12] refactor code --- rust/lance/src/dataset.rs | 7 +++ rust/lance/src/dataset/scanner.rs | 31 +++++++++-- rust/lance/src/dataset/tests/dataset_index.rs | 54 +++++++++++++++---- 3 files changed, 79 insertions(+), 13 deletions(-) diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index d227d60cfde..d4eba7bf684 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1832,6 +1832,13 @@ impl Dataset { .collect() } + /// Prunes dataset fragments using scalar indices for the given filter expression. + /// + /// This returns the subset of manifest fragments that still need to be scanned, + /// in manifest order. Fragments not covered by the participating scalar indices + /// are always retained, and if the filter does not yield a scalar index query + /// (or the index result cannot safely exclude fragments), this method is effectively + /// a no-op and returns all fragments. pub async fn prune_fragments(&self, filter: &str) -> Result> { Scanner::scalar_indexed_prune_fragments( Arc::new(self.clone()), diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index f3b31d72a6e..7fed5812af6 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -4002,7 +4002,7 @@ impl Scanner { if let Some(index_expr) = filter_plan.expr_filter_plan.index_query.as_ref() { // Figure out which fragments are covered by ALL indices - let (_, missing_frags) = scanner + let (covered_frags, missing_frags) = scanner .partition_frags_by_coverage(index_expr, fragments.clone()) .await?; @@ -4015,17 +4015,42 @@ impl Scanner { IndexExprResult::Exact(mask) | IndexExprResult::AtMost(mask) => match mask { RowAddrMask::AllowList(map) => { let allow_fragids: HashSet = map.fragments().into_iter().collect(); - let mut allow_frags: Vec = fragments + + let mut allow_frags: Vec = covered_frags .clone() .iter() .filter(|f| allow_fragids.contains(&(f.id as u32))) .cloned() .collect(); + // Always return index uncovered fragments allow_frags.extend(missing_frags); Ok(allow_frags) } - RowAddrMask::BlockList(_) => Ok(fragments.to_vec()), + + RowAddrMask::BlockList(map) => { + if map.is_empty() { + // No fragment is blocked, return all fragments + Ok(fragments.to_vec()) + } else { + let blocked_fragids: HashSet = map.fragments().into_iter().collect(); + + // If a fragment is not blocked or partially blocked, it needs to scan. + let mut allow_frags: Vec = covered_frags + .clone() + .iter() + .filter(|f| { + !blocked_fragids.contains(&(f.id as u32)) + || map.get_fragment_bitmap(f.id as u32).is_some() + }) + .cloned() + .collect(); + + // Always return index uncovered fragments + allow_frags.extend(missing_frags); + Ok(allow_frags) + } + } }, IndexExprResult::AtLeast(_) => Ok(fragments.to_vec()), diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index 6d82ce74890..ecde0d6ef64 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -10,7 +10,6 @@ use crate::dataset::tests::dataset_migrations::scan_dataset; use crate::dataset::tests::dataset_transactions::{assert_results, execute_sql}; use crate::dataset::ROW_ID; use crate::index::vector::VectorIndexParams; -use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount}; use crate::{Dataset, Error, Result}; use lance_arrow::FixedSizeListArrayExt; @@ -2469,10 +2468,27 @@ async fn test_auto_infer_lance_tokenizer() { #[tokio::test] async fn test_prune_fragments_without_scalar_index_returns_all() { - // Build a small dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. - let dataset = gen_batch() - .col("i", array::step::()) - .into_ram_dataset(FragmentCount::from(5), FragmentRowCount::from(10)) + // Build a dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..50))], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let dataset = Dataset::write(reader, &test_uri, Some(write_params)) .await .unwrap(); @@ -2490,9 +2506,26 @@ async fn test_prune_fragments_without_scalar_index_returns_all() { #[tokio::test] async fn test_prune_fragments_with_scalar_index_prunes_non_matching_fragments() { // Dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. - let mut dataset = gen_batch() - .col("i", array::step::()) - .into_ram_dataset(FragmentCount::from(5), FragmentRowCount::from(10)) + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..50))], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) .await .unwrap(); @@ -2509,7 +2542,7 @@ async fn test_prune_fragments_with_scalar_index_prunes_non_matching_fragments() .unwrap(); let fragments = dataset.fragments().clone(); - assert!(fragments.len() >= 3); + assert_eq!(fragments.len(), 5); // For filter i >= 30, all matching rows live in the last two fragments. let expected_tail_ids: Vec = fragments[fragments.len() - 2..] @@ -2525,7 +2558,8 @@ async fn test_prune_fragments_with_scalar_index_prunes_non_matching_fragments() #[tokio::test] async fn test_prune_fragments_keeps_fragments_outside_index_coverage() { - // Use TestVectorDataset to get a dataset with an Int32 column "i". + // Use TestVectorDataset to construct a multi-fragment dataset with an Int32 column "i". + // This matches the pattern used elsewhere in dataset_index.rs for multi-fragment tests. let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) .await .unwrap(); From 59011245a528822a3c36df91a98cfc88cba0264c Mon Sep 17 00:00:00 2001 From: "fangbo.0511" Date: Wed, 7 Jan 2026 14:01:21 +0800 Subject: [PATCH 07/12] modify comments --- rust/lance/src/dataset/scanner.rs | 47 ++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 7fed5812af6..6d568946c2b 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -3983,13 +3983,27 @@ impl Scanner { /// result, plus any fragments that are not fully covered by all indices. /// Fragments outside the index coverage are never dropped. /// - /// Pruning only happens when the filter plan contains a scalar index query - /// whose evaluation yields an allow-list [`RowAddrMask`] with *exact* or - /// *at-most* semantics. In that case, fragments that are fully covered by - /// the indices and have no allowed rows are pruned. If there is no index - /// query, the result has *at-least* semantics, or the index result is - /// represented as a block-list, all input `fragments` are returned - /// unchanged. + /// # Notes + /// + /// - Inputs: + /// - `dataset`: logical [`Dataset`] used to plan and evaluate the scalar index + /// expression. + /// - `filter`: SQL-like predicate string used for planning; it is not evaluated + /// as a full scan in this helper. + /// - `fragments`: manifest [`Fragment`]s considered for pruning. + /// - Pruning is driven only by scalar index results with *exact* or *at-most* + /// semantics. Results with *at-least* semantics cannot safely exclude any + /// fragment, so all `fragments` are returned unchanged in that case. + /// - When the index result is an allow-list [`RowAddrMask`], fragments that are + /// fully covered by the participating indices and have no allowed rows in the + /// mask are pruned. This never drops fragments that might still satisfy + /// `filter`. + /// - When the index result is a block-list [`RowAddrMask`], only fragments that + /// are both fully covered by the indices and fully blocked in the mask can be + /// pruned. Partially blocked fragments, and all fragments not covered by every + /// index, are always kept to avoid false negatives. + /// - This helper only performs scalar index planning and evaluation; it does not + /// build or execute a full scan plan. pub async fn scalar_indexed_prune_fragments( dataset: Arc, filter: &str, @@ -4001,12 +4015,14 @@ impl Scanner { let filter_plan = scanner.create_filter_plan(true).await?; if let Some(index_expr) = filter_plan.expr_filter_plan.index_query.as_ref() { - // Figure out which fragments are covered by ALL indices + // Partition fragments into those fully covered by all scalar indices and + // those that are not. let (covered_frags, missing_frags) = scanner .partition_frags_by_coverage(index_expr, fragments.clone()) .await?; - // Evaluate indices to find out which fragments satisfied + // Evaluate the scalar index expression to obtain a row-address mask + // over the covered fragments. let expr_result = index_expr .evaluate(dataset.as_ref(), &NoOpMetricsCollector) .await?; @@ -4016,6 +4032,7 @@ impl Scanner { RowAddrMask::AllowList(map) => { let allow_fragids: HashSet = map.fragments().into_iter().collect(); + // Among fully covered fragments, keep only those with at least one allowed row. let mut allow_frags: Vec = covered_frags .clone() .iter() @@ -4023,19 +4040,21 @@ impl Scanner { .cloned() .collect(); - // Always return index uncovered fragments + // Always keep fragments that are not fully covered by the indices. allow_frags.extend(missing_frags); Ok(allow_frags) } RowAddrMask::BlockList(map) => { if map.is_empty() { - // No fragment is blocked, return all fragments + // No fragment is blocked by the mask; nothing can be pruned. Ok(fragments.to_vec()) } else { - let blocked_fragids: HashSet = map.fragments().into_iter().collect(); + let blocked_fragids: HashSet = + map.fragments().into_iter().collect(); - // If a fragment is not blocked or partially blocked, it needs to scan. + // Fragments that are not blocked at all or only partially blocked still + // need to be scanned. let mut allow_frags: Vec = covered_frags .clone() .iter() @@ -4046,7 +4065,7 @@ impl Scanner { .cloned() .collect(); - // Always return index uncovered fragments + // Always keep fragments that are not fully covered by the indices. allow_frags.extend(missing_frags); Ok(allow_frags) } From 7a9244841633fde467c0a83e0d473c250ce23fdd Mon Sep 17 00:00:00 2001 From: "fangbo.0511" Date: Wed, 7 Jan 2026 20:33:09 +0800 Subject: [PATCH 08/12] add more test case --- rust/lance/src/dataset/tests/dataset_index.rs | 73 +++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index ecde0d6ef64..c658fbefec9 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -2556,6 +2556,79 @@ async fn test_prune_fragments_with_scalar_index_prunes_non_matching_fragments() assert_eq!(pruned_ids, expected_tail_ids); } +#[tokio::test] +async fn test_prune_fragments_with_scalar_index_and_mixed_or_filter_is_noop() { + // Multi-column dataset with predictable fragment boundaries: 5 fragments + // of 10 rows each, columns col_a, col_b, col_c. + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("col_a", DataType::Int32, false), + ArrowField::new("col_b", DataType::Int32, false), + ArrowField::new("col_c", DataType::Int32, false), + ])); + + // col_a: 0..50 (monotonic sequence for range queries) + let col_a = Int32Array::from_iter_values(0..50); + // col_b: first fragment has small values (< 10) so rows there can only + // match the filter via the non-indexed side `col_b < 10`; later fragments + // have large values. + let col_b = + Int32Array::from_iter_values((0..50).map( + |i| { + if i < 10 { + i + } else { + 100 + i + } + }, + )); + // col_c: arbitrary third column, no index. + let col_c = Int32Array::from_iter_values((0..50).map(|i| i * 2)); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(col_a), Arc::new(col_b), Arc::new(col_c)], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) + .await + .unwrap(); + + // Create a scalar index only on col_a. + dataset + .create_index( + &["col_a"], + IndexType::Scalar, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let fragments = dataset.fragments().clone(); + assert_eq!(fragments.len(), 5); + + // For filter `col_a >= 10 OR col_b < 10`, only `col_a` is indexable. The + // planner should treat this OR as mixed indexability and fall back to a + // refine-only filter plan, so scalar-index-based pruning becomes a no-op + // and all fragments are retained. + let pruned = dataset + .prune_fragments("col_a >= 10 OR col_b < 10") + .await + .unwrap(); + let original_ids: Vec = fragments.iter().map(|f| f.id).collect(); + let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); + assert_eq!(pruned_ids, original_ids); +} + #[tokio::test] async fn test_prune_fragments_keeps_fragments_outside_index_coverage() { // Use TestVectorDataset to construct a multi-fragment dataset with an Int32 column "i". From 6bd22e2f39b9df6e033c8e08722c03a2dbe217bf Mon Sep 17 00:00:00 2001 From: "fangbo.0511" Date: Wed, 7 Jan 2026 20:37:01 +0800 Subject: [PATCH 09/12] fix format issue --- rust/lance/src/dataset/tests/dataset_index.rs | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index c658fbefec9..419f5db0789 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -2572,16 +2572,7 @@ async fn test_prune_fragments_with_scalar_index_and_mixed_or_filter_is_noop() { // col_b: first fragment has small values (< 10) so rows there can only // match the filter via the non-indexed side `col_b < 10`; later fragments // have large values. - let col_b = - Int32Array::from_iter_values((0..50).map( - |i| { - if i < 10 { - i - } else { - 100 + i - } - }, - )); + let col_b = Int32Array::from_iter_values((0..50).map(|i| if i < 10 { i } else { 100 + i })); // col_c: arbitrary third column, no index. let col_c = Int32Array::from_iter_values((0..50).map(|i| i * 2)); From 0069d02e5741f105c5502a22e8e53155b635c568 Mon Sep 17 00:00:00 2001 From: "fangbo.0511" Date: Wed, 7 Jan 2026 21:25:11 +0800 Subject: [PATCH 10/12] add more test cases --- rust/lance/src/dataset/tests/dataset_index.rs | 234 +++++++++++++++++- 1 file changed, 233 insertions(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index 419f5db0789..8ed06788512 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -36,7 +36,11 @@ use lance_index::scalar::inverted::{ }; use lance_index::scalar::FullTextSearchQuery; use lance_index::DatasetIndexExt; -use lance_index::{scalar::ScalarIndexParams, vector::DIST_COL, IndexType}; +use lance_index::{ + scalar::{BuiltinIndexType, ScalarIndexParams}, + vector::DIST_COL, + IndexType, +}; use lance_linalg::distance::MetricType; use datafusion::common::{assert_contains, assert_not_contains}; @@ -2653,3 +2657,231 @@ async fn test_prune_fragments_keeps_fragments_outside_index_coverage() { // Fragments without index coverage must not be pruned. assert!(pruned_ids.contains(&new_fragment_id)); } + +#[tokio::test] +async fn test_prune_fragments_with_zonemap_scalar_index_prunes_non_matching_fragments() { + // Dataset with 5 fragments of 10 rows each: z = [0, 1, ..., 49]. + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "z", + DataType::Int32, + false, + )])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..50))], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) + .await + .unwrap(); + + // Create a ZoneMap scalar index on z so all current fragments are indexed. + let zonemap_params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + dataset + .create_index(&["z"], IndexType::Scalar, None, &zonemap_params, true) + .await + .unwrap(); + + let fragments = dataset.fragments().clone(); + assert_eq!(fragments.len(), 5); + + // For filter z >= 30, all matching rows live in the last two fragments. + // ZoneMap returns an AtMost allow-list mask, and scalar_indexed_prune_fragments + // prunes covered fragments that have no allowed rows while keeping uncovered + // fragments, so we expect only the tail fragments to remain. + let expected_tail_ids: Vec = fragments[fragments.len() - 2..] + .iter() + .map(|f| f.id) + .collect(); + + let pruned = dataset.prune_fragments("z >= 30").await.unwrap(); + let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); + + assert_eq!(pruned_ids, expected_tail_ids); +} + +#[tokio::test] +async fn test_prune_fragments_with_scalar_index_blocklist_partial_keeps_all_fragments() { + // Dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..50))], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) + .await + .unwrap(); + + // Create a scalar BTree index on i so all current fragments are indexed. + dataset + .create_index( + &["i"], + IndexType::Scalar, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let original_fragments = dataset.fragments().clone(); + assert_eq!(original_fragments.len(), 5); + + // Filter i != 30 is implemented as NOT(i = 30). The scalar index evaluates the + // equality as an exact allow-list and then negates it to an exact block-list + // containing only the single row with i = 30. Since no fragment is fully + // blocked in the resulting RowAddrMask::BlockList, scalar_indexed_prune_fragments + // must keep all fragments and preserve their manifest order. + let pruned = dataset.prune_fragments("i != 30").await.unwrap(); + + assert_eq!(pruned.len(), original_fragments.len()); + let original_ids: Vec = original_fragments.iter().map(|f| f.id).collect(); + let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); + assert_eq!(pruned_ids, original_ids); +} + +#[tokio::test] +async fn test_prune_fragments_with_scalar_index_blocklist_empty_keeps_all_fragments() { + // Dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..50))], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) + .await + .unwrap(); + + // Create a scalar BTree index on i so all current fragments are indexed. + dataset + .create_index( + &["i"], + IndexType::Scalar, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let original_fragments = dataset.fragments().clone(); + assert_eq!(original_fragments.len(), 5); + + // Filter i != 1000 is implemented as NOT(i = 1000). The equality matches no + // rows, so the negated scalar index result is an exact block-list with an + // empty RowAddrTreeMap. scalar_indexed_prune_fragments treats an empty + // RowAddrMask::BlockList as "no fragment is blocked" and returns all + // fragments unchanged. + let pruned = dataset.prune_fragments("i != 1000").await.unwrap(); + + assert_eq!(pruned.len(), original_fragments.len()); + let original_ids: Vec = original_fragments.iter().map(|f| f.id).collect(); + let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); + assert_eq!(pruned_ids, original_ids); +} + +#[tokio::test] +async fn test_prune_fragments_with_scalar_index_blocklist_full_blocks_some_fragments() { + // Dataset with 5 fragments of 10 rows each. Fragment 0 has only i = 30 so that + // i = 30 selects the entire first fragment, while other fragments have distinct + // values. From the data perspective this is a "full-block" fragment for the + // equality side of the predicate. + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + + // Build five batches of 10 identical values each so fragment boundaries align + // with constant-value regions under the 10-row write parameters. + let make_batch = |value: i32| { + RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![value; 10]))], + ) + .unwrap() + }; + + let batches = vec![ + Ok(make_batch(30)), // fragment 0: all 30 + Ok(make_batch(10)), // fragment 1: all 10 + Ok(make_batch(20)), // fragment 2: all 20 + Ok(make_batch(40)), // fragment 3: all 40 + Ok(make_batch(50)), // fragment 4: all 50 + ]; + + let reader = RecordBatchIterator::new(batches.into_iter(), schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) + .await + .unwrap(); + + // Create a scalar BTree index on i so all fragments are indexed. + dataset + .create_index( + &["i"], + IndexType::Scalar, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let original_fragments = dataset.fragments().clone(); + assert_eq!(original_fragments.len(), 5); + + // Filter i != 30 is implemented as NOT(i = 30). The equality side selects + // every row in fragment 0 and no rows in other fragments. Today the scalar + // index expression represents this as a BlockList with a *partial* bitmap + // for fragment 0 (RowAddrSelection::Partial), so scalar_indexed_prune_fragments + // treats it as a partially blocked fragment and keeps all fragments. + let pruned = dataset.prune_fragments("i != 30").await.unwrap(); + + assert_eq!(pruned.len(), original_fragments.len()); + let original_ids: Vec = original_fragments.iter().map(|f| f.id).collect(); + let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); + assert_eq!(pruned_ids, original_ids); +} From 199af37a252d17c614569eb3a7a796a7d8c01f33 Mon Sep 17 00:00:00 2001 From: "fangbo.0511" Date: Wed, 7 Jan 2026 21:37:24 +0800 Subject: [PATCH 11/12] refactor, move test cases to dataset_scanner.rs --- rust/lance/src/dataset/tests/dataset_index.rs | 423 +----------------- .../src/dataset/tests/dataset_scanner.rs | 355 ++++++++++++++- 2 files changed, 355 insertions(+), 423 deletions(-) diff --git a/rust/lance/src/dataset/tests/dataset_index.rs b/rust/lance/src/dataset/tests/dataset_index.rs index 8ed06788512..ce8254cbd52 100644 --- a/rust/lance/src/dataset/tests/dataset_index.rs +++ b/rust/lance/src/dataset/tests/dataset_index.rs @@ -5,7 +5,6 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use std::vec; -use crate::dataset::scanner::test_dataset::TestVectorDataset; use crate::dataset::tests::dataset_migrations::scan_dataset; use crate::dataset::tests::dataset_transactions::{assert_results, execute_sql}; use crate::dataset::ROW_ID; @@ -36,11 +35,7 @@ use lance_index::scalar::inverted::{ }; use lance_index::scalar::FullTextSearchQuery; use lance_index::DatasetIndexExt; -use lance_index::{ - scalar::{BuiltinIndexType, ScalarIndexParams}, - vector::DIST_COL, - IndexType, -}; +use lance_index::{scalar::ScalarIndexParams, vector::DIST_COL, IndexType}; use lance_linalg::distance::MetricType; use datafusion::common::{assert_contains, assert_not_contains}; @@ -2469,419 +2464,3 @@ async fn test_auto_infer_lance_tokenizer() { .unwrap(); assert_eq!(1, batch.num_rows()); } - -#[tokio::test] -async fn test_prune_fragments_without_scalar_index_returns_all() { - // Build a dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. - let test_uri = TempStrDir::default(); - let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( - "i", - DataType::Int32, - false, - )])); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(Int32Array::from_iter_values(0..50))], - ) - .unwrap(); - - let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - let write_params = WriteParams { - max_rows_per_file: 10, - max_rows_per_group: 10, - ..Default::default() - }; - let dataset = Dataset::write(reader, &test_uri, Some(write_params)) - .await - .unwrap(); - - let original_fragments = dataset.fragments().clone(); - - // Without a scalar index, pruning should be a no-op and return all fragments. - let pruned = dataset.prune_fragments("i >= 30").await.unwrap(); - - assert_eq!(pruned.len(), original_fragments.len()); - let original_ids: Vec = original_fragments.iter().map(|f| f.id).collect(); - let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); - assert_eq!(pruned_ids, original_ids); -} - -#[tokio::test] -async fn test_prune_fragments_with_scalar_index_prunes_non_matching_fragments() { - // Dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. - let test_uri = TempStrDir::default(); - let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( - "i", - DataType::Int32, - false, - )])); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(Int32Array::from_iter_values(0..50))], - ) - .unwrap(); - - let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - let write_params = WriteParams { - max_rows_per_file: 10, - max_rows_per_group: 10, - ..Default::default() - }; - let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) - .await - .unwrap(); - - // Create a scalar index on i so all current fragments are indexed. - dataset - .create_index( - &["i"], - IndexType::Scalar, - None, - &ScalarIndexParams::default(), - true, - ) - .await - .unwrap(); - - let fragments = dataset.fragments().clone(); - assert_eq!(fragments.len(), 5); - - // For filter i >= 30, all matching rows live in the last two fragments. - let expected_tail_ids: Vec = fragments[fragments.len() - 2..] - .iter() - .map(|f| f.id) - .collect(); - - let pruned = dataset.prune_fragments("i >= 30").await.unwrap(); - let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); - - assert_eq!(pruned_ids, expected_tail_ids); -} - -#[tokio::test] -async fn test_prune_fragments_with_scalar_index_and_mixed_or_filter_is_noop() { - // Multi-column dataset with predictable fragment boundaries: 5 fragments - // of 10 rows each, columns col_a, col_b, col_c. - let test_uri = TempStrDir::default(); - let schema = Arc::new(ArrowSchema::new(vec![ - ArrowField::new("col_a", DataType::Int32, false), - ArrowField::new("col_b", DataType::Int32, false), - ArrowField::new("col_c", DataType::Int32, false), - ])); - - // col_a: 0..50 (monotonic sequence for range queries) - let col_a = Int32Array::from_iter_values(0..50); - // col_b: first fragment has small values (< 10) so rows there can only - // match the filter via the non-indexed side `col_b < 10`; later fragments - // have large values. - let col_b = Int32Array::from_iter_values((0..50).map(|i| if i < 10 { i } else { 100 + i })); - // col_c: arbitrary third column, no index. - let col_c = Int32Array::from_iter_values((0..50).map(|i| i * 2)); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(col_a), Arc::new(col_b), Arc::new(col_c)], - ) - .unwrap(); - - let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - let write_params = WriteParams { - max_rows_per_file: 10, - max_rows_per_group: 10, - ..Default::default() - }; - let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) - .await - .unwrap(); - - // Create a scalar index only on col_a. - dataset - .create_index( - &["col_a"], - IndexType::Scalar, - None, - &ScalarIndexParams::default(), - true, - ) - .await - .unwrap(); - - let fragments = dataset.fragments().clone(); - assert_eq!(fragments.len(), 5); - - // For filter `col_a >= 10 OR col_b < 10`, only `col_a` is indexable. The - // planner should treat this OR as mixed indexability and fall back to a - // refine-only filter plan, so scalar-index-based pruning becomes a no-op - // and all fragments are retained. - let pruned = dataset - .prune_fragments("col_a >= 10 OR col_b < 10") - .await - .unwrap(); - let original_ids: Vec = fragments.iter().map(|f| f.id).collect(); - let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); - assert_eq!(pruned_ids, original_ids); -} - -#[tokio::test] -async fn test_prune_fragments_keeps_fragments_outside_index_coverage() { - // Use TestVectorDataset to construct a multi-fragment dataset with an Int32 column "i". - // This matches the pattern used elsewhere in dataset_index.rs for multi-fragment tests. - let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) - .await - .unwrap(); - - // Build a scalar index on i covering the initial fragments. - test_ds.make_scalar_index().await.unwrap(); - - let before_fragments = test_ds.dataset.fragments().clone(); - let before_ids: HashSet = before_fragments.iter().map(|f| f.id).collect(); - - // Append new data so the new fragment is not covered by the existing index. - test_ds.append_new_data().await.unwrap(); - let all_fragments = test_ds.dataset.fragments().clone(); - let new_fragments: Vec<_> = all_fragments - .iter() - .filter(|f| !before_ids.contains(&f.id)) - .collect(); - - // Sanity check: we expect exactly one newly appended fragment. - assert_eq!(new_fragments.len(), 1); - let new_fragment_id = new_fragments[0].id; - - // Use a filter that only matches early rows, which live in the original fragments. - let pruned = test_ds.dataset.prune_fragments("i < 10").await.unwrap(); - let pruned_ids: HashSet = pruned.iter().map(|f| f.id).collect(); - - // Fragments without index coverage must not be pruned. - assert!(pruned_ids.contains(&new_fragment_id)); -} - -#[tokio::test] -async fn test_prune_fragments_with_zonemap_scalar_index_prunes_non_matching_fragments() { - // Dataset with 5 fragments of 10 rows each: z = [0, 1, ..., 49]. - let test_uri = TempStrDir::default(); - let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( - "z", - DataType::Int32, - false, - )])); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(Int32Array::from_iter_values(0..50))], - ) - .unwrap(); - - let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - let write_params = WriteParams { - max_rows_per_file: 10, - max_rows_per_group: 10, - ..Default::default() - }; - let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) - .await - .unwrap(); - - // Create a ZoneMap scalar index on z so all current fragments are indexed. - let zonemap_params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); - dataset - .create_index(&["z"], IndexType::Scalar, None, &zonemap_params, true) - .await - .unwrap(); - - let fragments = dataset.fragments().clone(); - assert_eq!(fragments.len(), 5); - - // For filter z >= 30, all matching rows live in the last two fragments. - // ZoneMap returns an AtMost allow-list mask, and scalar_indexed_prune_fragments - // prunes covered fragments that have no allowed rows while keeping uncovered - // fragments, so we expect only the tail fragments to remain. - let expected_tail_ids: Vec = fragments[fragments.len() - 2..] - .iter() - .map(|f| f.id) - .collect(); - - let pruned = dataset.prune_fragments("z >= 30").await.unwrap(); - let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); - - assert_eq!(pruned_ids, expected_tail_ids); -} - -#[tokio::test] -async fn test_prune_fragments_with_scalar_index_blocklist_partial_keeps_all_fragments() { - // Dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. - let test_uri = TempStrDir::default(); - let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( - "i", - DataType::Int32, - false, - )])); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(Int32Array::from_iter_values(0..50))], - ) - .unwrap(); - - let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - let write_params = WriteParams { - max_rows_per_file: 10, - max_rows_per_group: 10, - ..Default::default() - }; - let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) - .await - .unwrap(); - - // Create a scalar BTree index on i so all current fragments are indexed. - dataset - .create_index( - &["i"], - IndexType::Scalar, - None, - &ScalarIndexParams::default(), - true, - ) - .await - .unwrap(); - - let original_fragments = dataset.fragments().clone(); - assert_eq!(original_fragments.len(), 5); - - // Filter i != 30 is implemented as NOT(i = 30). The scalar index evaluates the - // equality as an exact allow-list and then negates it to an exact block-list - // containing only the single row with i = 30. Since no fragment is fully - // blocked in the resulting RowAddrMask::BlockList, scalar_indexed_prune_fragments - // must keep all fragments and preserve their manifest order. - let pruned = dataset.prune_fragments("i != 30").await.unwrap(); - - assert_eq!(pruned.len(), original_fragments.len()); - let original_ids: Vec = original_fragments.iter().map(|f| f.id).collect(); - let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); - assert_eq!(pruned_ids, original_ids); -} - -#[tokio::test] -async fn test_prune_fragments_with_scalar_index_blocklist_empty_keeps_all_fragments() { - // Dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. - let test_uri = TempStrDir::default(); - let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( - "i", - DataType::Int32, - false, - )])); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(Int32Array::from_iter_values(0..50))], - ) - .unwrap(); - - let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); - let write_params = WriteParams { - max_rows_per_file: 10, - max_rows_per_group: 10, - ..Default::default() - }; - let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) - .await - .unwrap(); - - // Create a scalar BTree index on i so all current fragments are indexed. - dataset - .create_index( - &["i"], - IndexType::Scalar, - None, - &ScalarIndexParams::default(), - true, - ) - .await - .unwrap(); - - let original_fragments = dataset.fragments().clone(); - assert_eq!(original_fragments.len(), 5); - - // Filter i != 1000 is implemented as NOT(i = 1000). The equality matches no - // rows, so the negated scalar index result is an exact block-list with an - // empty RowAddrTreeMap. scalar_indexed_prune_fragments treats an empty - // RowAddrMask::BlockList as "no fragment is blocked" and returns all - // fragments unchanged. - let pruned = dataset.prune_fragments("i != 1000").await.unwrap(); - - assert_eq!(pruned.len(), original_fragments.len()); - let original_ids: Vec = original_fragments.iter().map(|f| f.id).collect(); - let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); - assert_eq!(pruned_ids, original_ids); -} - -#[tokio::test] -async fn test_prune_fragments_with_scalar_index_blocklist_full_blocks_some_fragments() { - // Dataset with 5 fragments of 10 rows each. Fragment 0 has only i = 30 so that - // i = 30 selects the entire first fragment, while other fragments have distinct - // values. From the data perspective this is a "full-block" fragment for the - // equality side of the predicate. - let test_uri = TempStrDir::default(); - let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( - "i", - DataType::Int32, - false, - )])); - - // Build five batches of 10 identical values each so fragment boundaries align - // with constant-value regions under the 10-row write parameters. - let make_batch = |value: i32| { - RecordBatch::try_new( - schema.clone(), - vec![Arc::new(Int32Array::from(vec![value; 10]))], - ) - .unwrap() - }; - - let batches = vec![ - Ok(make_batch(30)), // fragment 0: all 30 - Ok(make_batch(10)), // fragment 1: all 10 - Ok(make_batch(20)), // fragment 2: all 20 - Ok(make_batch(40)), // fragment 3: all 40 - Ok(make_batch(50)), // fragment 4: all 50 - ]; - - let reader = RecordBatchIterator::new(batches.into_iter(), schema.clone()); - let write_params = WriteParams { - max_rows_per_file: 10, - max_rows_per_group: 10, - ..Default::default() - }; - let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) - .await - .unwrap(); - - // Create a scalar BTree index on i so all fragments are indexed. - dataset - .create_index( - &["i"], - IndexType::Scalar, - None, - &ScalarIndexParams::default(), - true, - ) - .await - .unwrap(); - - let original_fragments = dataset.fragments().clone(); - assert_eq!(original_fragments.len(), 5); - - // Filter i != 30 is implemented as NOT(i = 30). The equality side selects - // every row in fragment 0 and no rows in other fragments. Today the scalar - // index expression represents this as a BlockList with a *partial* bitmap - // for fragment 0 (RowAddrSelection::Partial), so scalar_indexed_prune_fragments - // treats it as a partially blocked fragment and keeps all fragments. - let pruned = dataset.prune_fragments("i != 30").await.unwrap(); - - assert_eq!(pruned.len(), original_fragments.len()); - let original_ids: Vec = original_fragments.iter().map(|f| f.id).collect(); - let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); - assert_eq!(pruned_ids, original_ids); -} diff --git a/rust/lance/src/dataset/tests/dataset_scanner.rs b/rust/lance/src/dataset/tests/dataset_scanner.rs index 9fce5f6d2ca..bb922e03947 100644 --- a/rust/lance/src/dataset/tests/dataset_scanner.rs +++ b/rust/lance/src/dataset/tests/dataset_scanner.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::collections::HashSet; use std::sync::Arc; use std::vec; @@ -17,7 +18,7 @@ use lance_arrow::SchemaExt; use lance_index::scalar::inverted::{ query::PhraseQuery, tokenizer::InvertedIndexParams, SCORE_FIELD, }; -use lance_index::scalar::FullTextSearchQuery; +use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery, ScalarIndexParams}; use lance_index::{vector::DIST_COL, DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; @@ -28,6 +29,10 @@ use lance_index::vector::ivf::IvfBuildParams; use lance_index::vector::pq::PQBuildParams; use lance_index::vector::Query; use pretty_assertions::assert_eq; +use lance_core::utils::tempfile::TempStrDir; +use lance_encoding::version::LanceFileVersion; +use crate::dataset::scanner::test_dataset::TestVectorDataset; +use crate::dataset::WriteParams; #[tokio::test] async fn test_vector_filter_fts_search() { @@ -465,3 +470,351 @@ async fn check_results( .unwrap(); assert_eq!(ids.values(), expected_ids); } + + +#[tokio::test] +async fn test_prune_fragments_without_scalar_index_returns_all() { + // Build a dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..50))], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let dataset = Dataset::write(reader, &test_uri, Some(write_params)) + .await + .unwrap(); + + let original_fragments = dataset.fragments().clone(); + + // Without a scalar index, pruning should be a no-op and return all fragments. + let pruned = dataset.prune_fragments("i >= 30").await.unwrap(); + + std::assert_eq!(pruned.len(), original_fragments.len()); + let original_ids: Vec = original_fragments.iter().map(|f| f.id).collect(); + let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); + std::assert_eq!(pruned_ids, original_ids); +} + +#[tokio::test] +async fn test_prune_fragments_with_scalar_index_prunes_non_matching_fragments() { + // Dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..50))], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) + .await + .unwrap(); + + // Create a scalar index on i so all current fragments are indexed. + dataset + .create_index( + &["i"], + IndexType::Scalar, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let fragments = dataset.fragments().clone(); + std::assert_eq!(fragments.len(), 5); + + // For filter i >= 30, all matching rows live in the last two fragments. + let expected_tail_ids: Vec = fragments[fragments.len() - 2..] + .iter() + .map(|f| f.id) + .collect(); + + let pruned = dataset.prune_fragments("i >= 30").await.unwrap(); + let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); + + std::assert_eq!(pruned_ids, expected_tail_ids); +} + +#[tokio::test] +async fn test_prune_fragments_with_scalar_index_and_mixed_or_filter_is_noop() { + // Multi-column dataset with predictable fragment boundaries: 5 fragments + // of 10 rows each, columns col_a, col_b, col_c. + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("col_a", DataType::Int32, false), + ArrowField::new("col_b", DataType::Int32, false), + ArrowField::new("col_c", DataType::Int32, false), + ])); + + // col_a: 0..50 (monotonic sequence for range queries) + let col_a = Int32Array::from_iter_values(0..50); + // col_b: first fragment has small values (< 10) so rows there can only + // match the filter via the non-indexed side `col_b < 10`; later fragments + // have large values. + let col_b = Int32Array::from_iter_values((0..50).map(|i| if i < 10 { i } else { 100 + i })); + // col_c: arbitrary third column, no index. + let col_c = Int32Array::from_iter_values((0..50).map(|i| i * 2)); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(col_a), Arc::new(col_b), Arc::new(col_c)], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) + .await + .unwrap(); + + // Create a scalar index only on col_a. + dataset + .create_index( + &["col_a"], + IndexType::Scalar, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let fragments = dataset.fragments().clone(); + std::assert_eq!(fragments.len(), 5); + + // For filter `col_a >= 10 OR col_b < 10`, only `col_a` is indexable. The + // planner should treat this OR as mixed indexability and fall back to a + // refine-only filter plan, so scalar-index-based pruning becomes a no-op + // and all fragments are retained. + let pruned = dataset + .prune_fragments("col_a >= 10 OR col_b < 10") + .await + .unwrap(); + let original_ids: Vec = fragments.iter().map(|f| f.id).collect(); + let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); + std::assert_eq!(pruned_ids, original_ids); +} + +#[tokio::test] +async fn test_prune_fragments_keeps_fragments_outside_index_coverage() { + // Use TestVectorDataset to construct a multi-fragment dataset with an Int32 column "i". + // This matches the pattern used elsewhere in dataset_index.rs for multi-fragment tests. + let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) + .await + .unwrap(); + + // Build a scalar index on i covering the initial fragments. + test_ds.make_scalar_index().await.unwrap(); + + let before_fragments = test_ds.dataset.fragments().clone(); + let before_ids: HashSet = before_fragments.iter().map(|f| f.id).collect(); + + // Append new data so the new fragment is not covered by the existing index. + test_ds.append_new_data().await.unwrap(); + let all_fragments = test_ds.dataset.fragments().clone(); + let new_fragments: Vec<_> = all_fragments + .iter() + .filter(|f| !before_ids.contains(&f.id)) + .collect(); + + // Sanity check: we expect exactly one newly appended fragment. + std::assert_eq!(new_fragments.len(), 1); + let new_fragment_id = new_fragments[0].id; + + // Use a filter that only matches early rows, which live in the original fragments. + let pruned = test_ds.dataset.prune_fragments("i < 10").await.unwrap(); + let pruned_ids: HashSet = pruned.iter().map(|f| f.id).collect(); + + // Fragments without index coverage must not be pruned. + assert!(pruned_ids.contains(&new_fragment_id)); +} + +#[tokio::test] +async fn test_prune_fragments_with_zonemap_scalar_index_prunes_non_matching_fragments() { + // Dataset with 5 fragments of 10 rows each: z = [0, 1, ..., 49]. + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "z", + DataType::Int32, + false, + )])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..50))], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) + .await + .unwrap(); + + // Create a ZoneMap scalar index on z so all current fragments are indexed. + let zonemap_params = ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap); + dataset + .create_index(&["z"], IndexType::Scalar, None, &zonemap_params, true) + .await + .unwrap(); + + let fragments = dataset.fragments().clone(); + std::assert_eq!(fragments.len(), 5); + + // For filter z >= 30, all matching rows live in the last two fragments. + // ZoneMap returns an AtMost allow-list mask, and scalar_indexed_prune_fragments + // prunes covered fragments that have no allowed rows while keeping uncovered + // fragments, so we expect only the tail fragments to remain. + let expected_tail_ids: Vec = fragments[fragments.len() - 2..] + .iter() + .map(|f| f.id) + .collect(); + + let pruned = dataset.prune_fragments("z >= 30").await.unwrap(); + let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); + + std::assert_eq!(pruned_ids, expected_tail_ids); +} + +#[tokio::test] +async fn test_prune_fragments_with_scalar_index_blocklist_partial_keeps_all_fragments() { + // Dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..50))], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) + .await + .unwrap(); + + // Create a scalar BTree index on i so all current fragments are indexed. + dataset + .create_index( + &["i"], + IndexType::Scalar, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let original_fragments = dataset.fragments().clone(); + std::assert_eq!(original_fragments.len(), 5); + + // Filter i != 30 is implemented as NOT(i = 30). The scalar index evaluates the + // equality as an exact allow-list and then negates it to an exact block-list + // containing only the single row with i = 30. Since no fragment is fully + // blocked in the resulting RowAddrMask::BlockList, scalar_indexed_prune_fragments + // must keep all fragments and preserve their manifest order. + let pruned = dataset.prune_fragments("i != 30").await.unwrap(); + + std::assert_eq!(pruned.len(), original_fragments.len()); + let original_ids: Vec = original_fragments.iter().map(|f| f.id).collect(); + let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); + std::assert_eq!(pruned_ids, original_ids); +} + +#[tokio::test] +async fn test_prune_fragments_with_scalar_index_blocklist_empty_keeps_all_fragments() { + // Dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..50))], + ) + .unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let write_params = WriteParams { + max_rows_per_file: 10, + max_rows_per_group: 10, + ..Default::default() + }; + let mut dataset = Dataset::write(reader, &test_uri, Some(write_params)) + .await + .unwrap(); + + // Create a scalar BTree index on i so all current fragments are indexed. + dataset + .create_index( + &["i"], + IndexType::Scalar, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + + let original_fragments = dataset.fragments().clone(); + std::assert_eq!(original_fragments.len(), 5); + + // Filter i != 1000 is implemented as NOT(i = 1000). The equality matches no + // rows, so the negated scalar index result is an exact block-list with an + // empty RowAddrTreeMap. scalar_indexed_prune_fragments treats an empty + // RowAddrMask::BlockList as "no fragment is blocked" and returns all + // fragments unchanged. + let pruned = dataset.prune_fragments("i != 1000").await.unwrap(); + + std::assert_eq!(pruned.len(), original_fragments.len()); + let original_ids: Vec = original_fragments.iter().map(|f| f.id).collect(); + let pruned_ids: Vec = pruned.iter().map(|f| f.id).collect(); + std::assert_eq!(pruned_ids, original_ids); +} From 95dbef53fae29af27b6d539fa4ac39c05ba2bacd Mon Sep 17 00:00:00 2001 From: "fangbo.0511" Date: Wed, 7 Jan 2026 21:39:52 +0800 Subject: [PATCH 12/12] format code --- .../src/dataset/tests/dataset_scanner.rs | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/rust/lance/src/dataset/tests/dataset_scanner.rs b/rust/lance/src/dataset/tests/dataset_scanner.rs index bb922e03947..989438fd3f1 100644 --- a/rust/lance/src/dataset/tests/dataset_scanner.rs +++ b/rust/lance/src/dataset/tests/dataset_scanner.rs @@ -22,17 +22,17 @@ use lance_index::scalar::{BuiltinIndexType, FullTextSearchQuery, ScalarIndexPara use lance_index::{vector::DIST_COL, DatasetIndexExt, IndexType}; use lance_linalg::distance::MetricType; +use crate::dataset::scanner::test_dataset::TestVectorDataset; use crate::dataset::scanner::{DatasetRecordBatchStream, QueryFilter}; +use crate::dataset::WriteParams; use crate::Dataset; +use lance_core::utils::tempfile::TempStrDir; +use lance_encoding::version::LanceFileVersion; use lance_index::scalar::inverted::query::FtsQuery; use lance_index::vector::ivf::IvfBuildParams; use lance_index::vector::pq::PQBuildParams; use lance_index::vector::Query; use pretty_assertions::assert_eq; -use lance_core::utils::tempfile::TempStrDir; -use lance_encoding::version::LanceFileVersion; -use crate::dataset::scanner::test_dataset::TestVectorDataset; -use crate::dataset::WriteParams; #[tokio::test] async fn test_vector_filter_fts_search() { @@ -471,7 +471,6 @@ async fn check_results( assert_eq!(ids.values(), expected_ids); } - #[tokio::test] async fn test_prune_fragments_without_scalar_index_returns_all() { // Build a dataset with 5 fragments of 10 rows each: i = [0, 1, ..., 49]. @@ -486,7 +485,7 @@ async fn test_prune_fragments_without_scalar_index_returns_all() { schema.clone(), vec![Arc::new(Int32Array::from_iter_values(0..50))], ) - .unwrap(); + .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); let write_params = WriteParams { @@ -523,7 +522,7 @@ async fn test_prune_fragments_with_scalar_index_prunes_non_matching_fragments() schema.clone(), vec![Arc::new(Int32Array::from_iter_values(0..50))], ) - .unwrap(); + .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); let write_params = WriteParams { @@ -586,7 +585,7 @@ async fn test_prune_fragments_with_scalar_index_and_mixed_or_filter_is_noop() { schema.clone(), vec![Arc::new(col_a), Arc::new(col_b), Arc::new(col_c)], ) - .unwrap(); + .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); let write_params = WriteParams { @@ -674,7 +673,7 @@ async fn test_prune_fragments_with_zonemap_scalar_index_prunes_non_matching_frag schema.clone(), vec![Arc::new(Int32Array::from_iter_values(0..50))], ) - .unwrap(); + .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); let write_params = WriteParams { @@ -725,7 +724,7 @@ async fn test_prune_fragments_with_scalar_index_blocklist_partial_keeps_all_frag schema.clone(), vec![Arc::new(Int32Array::from_iter_values(0..50))], ) - .unwrap(); + .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); let write_params = WriteParams { @@ -779,7 +778,7 @@ async fn test_prune_fragments_with_scalar_index_blocklist_empty_keeps_all_fragme schema.clone(), vec![Arc::new(Int32Array::from_iter_values(0..50))], ) - .unwrap(); + .unwrap(); let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); let write_params = WriteParams {