From 20296ebb3ccd0dd1236169cab4f6aa08be2c9c1e Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sat, 10 Jan 2026 19:11:05 -0800 Subject: [PATCH] feat: add WriteStats to high-level write operations Add a unified WriteStats struct to track statistics across Insert, Update, Delete, and Merge-Insert operations. This enables users to get information about rows written/updated/deleted, files written, and bytes written. - InsertBuilder::execute() now returns InsertResult with stats - UpdateResult now includes WriteStats - DeleteBuilder::execute() now returns DeleteResult with stats - MergeStats gains write_stats() accessor for compatibility Co-Authored-By: Claude Opus 4.5 --- rust/lance-table/src/io/deletion.rs | 41 +-- rust/lance/src/dataset.rs | 17 +- rust/lance/src/dataset/fragment.rs | 24 +- .../src/dataset/tests/dataset_merge_update.rs | 15 +- .../src/dataset/tests/dataset_transactions.rs | 9 +- rust/lance/src/dataset/write.rs | 36 ++- rust/lance/src/dataset/write/commit.rs | 12 +- rust/lance/src/dataset/write/delete.rs | 250 ++++++++++++------ rust/lance/src/dataset/write/insert.rs | 77 ++++-- rust/lance/src/dataset/write/merge_insert.rs | 51 +++- .../src/dataset/write/merge_insert/exec.rs | 4 +- rust/lance/src/dataset/write/update.rs | 33 ++- rust/lance/src/index/vector/ivf.rs | 9 +- rust/lance/src/index/vector/ivf/v2.rs | 3 +- rust/lance/src/index/vector/utils.rs | 6 +- rust/lance/src/io/commit/conflict_resolver.rs | 9 +- rust/lance/src/io/commit/s3_test.rs | 3 +- rust/lance/src/io/exec/filtered_read.rs | 3 +- 18 files changed, 433 insertions(+), 169 deletions(-) diff --git a/rust/lance-table/src/io/deletion.rs b/rust/lance-table/src/io/deletion.rs index 5dd0028bfd7..1975b80771f 100644 --- a/rust/lance-table/src/io/deletion.rs +++ b/rust/lance-table/src/io/deletion.rs @@ -59,17 +59,17 @@ pub fn relative_deletion_file_path(fragment_id: u64, deletion_file: &DeletionFil /// Write a deletion file for a fragment for a given deletion vector. /// -/// Returns the deletion file if one was written. If no deletions were present, -/// returns `Ok(None)`. +/// Returns a tuple of (deletion_file, bytes_written). If no deletions were present, +/// returns `Ok((None, 0))`. pub async fn write_deletion_file( base: &Path, fragment_id: u64, read_version: u64, removed_rows: &DeletionVector, object_store: &ObjectStore, -) -> Result> { - let deletion_file = match removed_rows { - DeletionVector::NoDeletions => None, +) -> Result<(Option, u64)> { + let (deletion_file, bytes_written) = match removed_rows { + DeletionVector::NoDeletions => (None, 0), DeletionVector::Set(set) => { let id = rand::rng().random::(); let deletion_file = DeletionFile { @@ -101,11 +101,12 @@ pub async fn write_deletion_file( // Drop writer so out is no longer borrowed. } + let bytes_written = out.len() as u64; object_store.put(&path, &out).await?; info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DELETION, path = path.to_string()); - Some(deletion_file) + (Some(deletion_file), bytes_written) } DeletionVector::Bitmap(bitmap) => { let id = rand::rng().random::(); @@ -121,14 +122,15 @@ pub async fn write_deletion_file( let mut out: Vec = Vec::new(); bitmap.serialize_into(&mut out)?; + let bytes_written = out.len() as u64; object_store.put(&path, &out).await?; info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DELETION, path = path.to_string()); - Some(deletion_file) + (Some(deletion_file), bytes_written) } }; - Ok(deletion_file) + Ok((deletion_file, bytes_written)) } #[instrument( @@ -235,10 +237,11 @@ mod test { let (object_store, path) = ObjectStore::from_uri("memory:///no_deletion") .await .unwrap(); - let file = write_deletion_file(&path, 0, 0, &dv, &object_store) + let (file, bytes_written) = write_deletion_file(&path, 0, 0, &dv, &object_store) .await .unwrap(); assert!(file.is_none()); + assert_eq!(bytes_written, 0); } #[tokio::test] @@ -250,9 +253,10 @@ mod test { let object_store = ObjectStore::memory(); let path = Path::from("/write"); - let file = write_deletion_file(&path, fragment_id, read_version, &dv, &object_store) - .await - .unwrap(); + let (file, bytes_written) = + write_deletion_file(&path, fragment_id, read_version, &dv, &object_store) + .await + .unwrap(); assert!(matches!( file, @@ -261,6 +265,7 @@ mod test { .. }) )); + assert!(bytes_written > 0); let file = file.unwrap(); assert_eq!(file.read_version, read_version); @@ -304,9 +309,10 @@ mod test { let object_store = ObjectStore::memory(); let path = Path::from("/bitmap"); - let file = write_deletion_file(&path, fragment_id, read_version, &dv, &object_store) - .await - .unwrap(); + let (file, bytes_written) = + write_deletion_file(&path, fragment_id, read_version, &dv, &object_store) + .await + .unwrap(); assert!(matches!( file, @@ -315,6 +321,7 @@ mod test { .. }) )); + assert!(bytes_written > 0); let file = file.unwrap(); assert_eq!(file.read_version, read_version); @@ -346,7 +353,7 @@ mod test { let object_store = ObjectStore::memory(); let path = Path::from("/roundtrip"); - let file = write_deletion_file(&path, fragment_id, read_version, &dv, &object_store) + let (file, _) = write_deletion_file(&path, fragment_id, read_version, &dv, &object_store) .await .unwrap(); @@ -365,7 +372,7 @@ mod test { let object_store = ObjectStore::memory(); let path = Path::from("/bitmap"); - let file = write_deletion_file(&path, fragment_id, read_version, &dv, &object_store) + let (file, _) = write_deletion_file(&path, fragment_id, read_version, &dv, &object_store) .await .unwrap(); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 7d1f4dc8395..7126c05644d 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -124,11 +124,11 @@ pub use write::merge_insert::{ WhenNotMatched, WhenNotMatchedBySource, }; -pub use write::update::{UpdateBuilder, UpdateJob}; +pub use write::update::{UpdateBuilder, UpdateJob, UpdateResult}; #[allow(deprecated)] pub use write::{ - write_fragments, AutoCleanupParams, CommitBuilder, DeleteBuilder, InsertBuilder, - WriteDestination, WriteMode, WriteParams, + write_fragments, AutoCleanupParams, CommitBuilder, DeleteBuilder, DeleteResult, InsertBuilder, + InsertResult, WriteDestination, WriteMode, WriteParams, WriteStats, }; pub(crate) const INDICES_DIR: &str = "_indices"; @@ -793,8 +793,11 @@ impl Dataset { if let Some(params) = ¶ms { builder = builder.with_params(params); } - Box::pin(builder.execute_stream(Box::new(batches) as Box)) - .await + let result = Box::pin( + builder.execute_stream(Box::new(batches) as Box), + ) + .await?; + Ok(result.dataset) } /// Write into a namespace-managed table with automatic credential vending. @@ -969,12 +972,12 @@ impl Dataset { ..params.unwrap_or_default() }; - let new_dataset = InsertBuilder::new(WriteDestination::Dataset(Arc::new(self.clone()))) + let result = InsertBuilder::new(WriteDestination::Dataset(Arc::new(self.clone()))) .with_params(&write_params) .execute_stream(Box::new(batches) as Box) .await?; - *self = new_dataset; + *self = result.dataset; Ok(()) } diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index 099b883d355..6e08a1b0ab8 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1792,13 +1792,18 @@ impl FileFragment { return Ok(Some(self)); } - self.write_deletions(deletion_vector).await + let (fragment, _bytes_written) = self.write_deletions(deletion_vector).await?; + Ok(fragment) } + /// Extend the deletion vector with new deletions. + /// + /// Returns `(Some(fragment), bytes_written)` if the fragment still has rows, + /// or `(None, 0)` if all rows are deleted. pub async fn extend_deletions( self, new_deletions: impl IntoIterator, - ) -> Result> { + ) -> Result<(Option, u64)> { let mut deletion_vector = self .get_deletion_vector() .await? @@ -1811,12 +1816,15 @@ impl FileFragment { self.write_deletions(deletion_vector).await } - async fn write_deletions(mut self, deletion_vector: DeletionVector) -> Result> { + async fn write_deletions( + mut self, + deletion_vector: DeletionVector, + ) -> Result<(Option, u64)> { let physical_rows = self.physical_rows().await?; if deletion_vector.len() == physical_rows && deletion_vector.contains_range(0..physical_rows as u32) { - return Ok(None); + return Ok((None, 0)); } else if deletion_vector.len() >= physical_rows { let dv_len = deletion_vector.len(); let examples: Vec = deletion_vector @@ -1835,7 +1843,7 @@ impl FileFragment { }); } - self.metadata.deletion_file = write_deletion_file( + let (deletion_file, bytes_written) = write_deletion_file( &self.dataset.base, self.metadata.id, self.dataset.version().version, @@ -1843,8 +1851,9 @@ impl FileFragment { self.dataset.object_store(), ) .await?; + self.metadata.deletion_file = deletion_file; - Ok(Some(self)) + Ok((Some(self), bytes_written)) } } @@ -3924,7 +3933,8 @@ mod tests { .with_params(&write_params) .execute(vec![batch]) .await - .unwrap(); + .unwrap() + .dataset; let fragment = dataset.get_fragments().pop().unwrap(); // Assert file is small (< 4300 bytes) diff --git a/rust/lance/src/dataset/tests/dataset_merge_update.rs b/rust/lance/src/dataset/tests/dataset_merge_update.rs index aa35f1b6408..ef6a31a2767 100644 --- a/rust/lance/src/dataset/tests/dataset_merge_update.rs +++ b/rust/lance/src/dataset/tests/dataset_merge_update.rs @@ -1058,14 +1058,16 @@ async fn test_replace_dataset() { let mut ds = InsertBuilder::new(&test_uri) .execute(vec![data1]) .await - .unwrap(); + .unwrap() + .dataset; ds.object_store().remove_dir_all(test_path).await.unwrap(); let ds2 = InsertBuilder::new(&test_uri) .execute(vec![data2.clone()]) .await - .unwrap(); + .unwrap() + .dataset; ds.checkout_latest().await.unwrap(); let roundtripped = ds.scan().try_into_batch().await.unwrap(); @@ -1121,7 +1123,8 @@ async fn test_insert_skip_auto_cleanup() { .with_params(&write_params1) .execute_stream(data1) .await - .unwrap(); + .unwrap() + .dataset; assert_eq!(dataset2.version().version, 2); @@ -1137,7 +1140,8 @@ async fn test_insert_skip_auto_cleanup() { .with_params(&write_params1) .execute_stream(data1_extra) .await - .unwrap(); + .unwrap() + .dataset; assert_eq!(dataset2_extra.version().version, 3); @@ -1170,7 +1174,8 @@ async fn test_insert_skip_auto_cleanup() { .with_params(&write_params2) .execute_stream(data2) .await - .unwrap(); + .unwrap() + .dataset; assert_eq!(dataset3.version().version, 4); diff --git a/rust/lance/src/dataset/tests/dataset_transactions.rs b/rust/lance/src/dataset/tests/dataset_transactions.rs index 5e651a3df8c..5d9b121d6ea 100644 --- a/rust/lance/src/dataset/tests/dataset_transactions.rs +++ b/rust/lance/src/dataset/tests/dataset_transactions.rs @@ -161,7 +161,8 @@ async fn test_session_store_registry() { .with_params(&write_params) .execute(vec![batch.clone()]) .await - .unwrap(); + .unwrap() + .dataset; // Assert there is one active store. assert_eq!(registry.active_stores().len(), 1); @@ -172,7 +173,8 @@ async fn test_session_store_registry() { .with_params(&write_params) .execute(vec![batch.clone()]) .await - .unwrap(); + .unwrap() + .dataset; assert_eq!(registry.active_stores().len(), 1); assert_eq!( Arc::as_ptr(&dataset.object_store().inner), @@ -192,7 +194,8 @@ async fn test_session_store_registry() { .with_params(&write_params2) .execute(vec![batch.clone()]) .await - .unwrap(); + .unwrap() + .dataset; assert_eq!(registry.active_stores().len(), 2); assert_ne!( Arc::as_ptr(&dataset.object_store().inner), diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index 585596e513e..f49cb5e2c26 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -59,8 +59,40 @@ mod retry; pub mod update; pub use commit::CommitBuilder; -pub use delete::DeleteBuilder; -pub use insert::InsertBuilder; +pub use delete::{DeleteBuilder, DeleteResult}; +pub use insert::{InsertBuilder, InsertResult}; + +/// Statistics for write operations. +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct WriteStats { + /// Number of rows written (inserted). + pub rows_written: u64, + /// Number of rows updated. + pub rows_updated: u64, + /// Number of rows deleted. + pub rows_deleted: u64, + /// Number of files written. + pub files_written: u64, + /// Total bytes written to storage. + pub bytes_written: u64, +} + +impl WriteStats { + /// Calculate write stats from newly written fragments. + pub(crate) fn from_fragments(fragments: &[Fragment]) -> Self { + let mut stats = Self::default(); + for fragment in fragments { + stats.rows_written += fragment.physical_rows.unwrap_or(0) as u64; + for data_file in &fragment.files { + if let Some(size) = data_file.file_size_bytes.get() { + stats.bytes_written += u64::from(size); + } + stats.files_written += 1; + } + } + stats + } +} /// The destination to write data to. #[derive(Debug, Clone)] diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index 5070ee5e65f..3d38182428e 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -544,7 +544,8 @@ mod tests { }) .execute(vec![batch]) .await - .unwrap(); + .unwrap() + .dataset; let dataset = Arc::new(dataset); let io_stats = dataset.object_store().io_stats_incremental(); @@ -624,7 +625,8 @@ mod tests { .with_params(&write_params) .execute(vec![data]) .await - .unwrap(); + .unwrap() + .dataset; dataset.object_store().io_stats_incremental(); // Reset the stats let read_version = dataset.manifest().version; @@ -681,7 +683,8 @@ mod tests { .with_params(&write_params) .execute(vec![data]) .await - .unwrap(); + .unwrap() + .dataset; let original_dataset = Arc::new(dataset.clone()); // Create 3 other transactions that happen concurrently. @@ -741,7 +744,8 @@ mod tests { let dataset = InsertBuilder::new("memory://test") .execute(vec![batch]) .await - .unwrap(); + .unwrap() + .dataset; let dataset = Arc::new(dataset); // Attempting to commit empty gives error diff --git a/rust/lance/src/dataset/write/delete.rs b/rust/lance/src/dataset/write/delete.rs index 66e1da10d4b..2293cad2f75 100644 --- a/rust/lance/src/dataset/write/delete.rs +++ b/rust/lance/src/dataset/write/delete.rs @@ -20,25 +20,26 @@ use std::sync::Arc; use std::time::Duration; use super::retry::{execute_with_retry, RetryConfig, RetryExecutor}; -use super::CommitBuilder; +use super::{CommitBuilder, WriteStats}; /// Apply deletions to fragments based on a RoaringTreemap of row IDs. /// -/// Returns the set of modified fragments and removed fragments, if any. +/// Returns (modified fragments, removed fragment IDs, bytes written). async fn apply_deletions( dataset: &Dataset, removed_row_addrs: &RoaringTreemap, -) -> Result<(Vec, Vec)> { +) -> Result<(Vec, Vec, u64)> { let bitmaps = Arc::new(removed_row_addrs.bitmaps().collect::>()); enum FragmentChange { Unchanged, - Modified(Box), + Modified(Box, u64), // (fragment, bytes_written) Removed(u64), } let mut updated_fragments = Vec::new(); let mut removed_fragments = Vec::new(); + let mut total_bytes_written = 0u64; let mut stream = futures::stream::iter(dataset.get_fragments()) .map(move |fragment| { @@ -47,10 +48,11 @@ async fn apply_deletions( let fragment_id = fragment.id(); if let Some(bitmap) = bitmaps_ref.get(&(fragment_id as u32)) { match fragment.extend_deletions(*bitmap).await { - Ok(Some(new_fragment)) => { - Ok(FragmentChange::Modified(Box::new(new_fragment.metadata))) - } - Ok(None) => Ok(FragmentChange::Removed(fragment_id as u64)), + Ok((Some(new_fragment), bytes_written)) => Ok(FragmentChange::Modified( + Box::new(new_fragment.metadata), + bytes_written, + )), + Ok((None, _)) => Ok(FragmentChange::Removed(fragment_id as u64)), Err(e) => Err(e), } } else { @@ -63,12 +65,24 @@ async fn apply_deletions( while let Some(res) = stream.next().await.transpose()? { match res { FragmentChange::Unchanged => {} - FragmentChange::Modified(fragment) => updated_fragments.push(*fragment), + FragmentChange::Modified(fragment, bytes_written) => { + updated_fragments.push(*fragment); + total_bytes_written += bytes_written; + } FragmentChange::Removed(fragment_id) => removed_fragments.push(fragment_id), } } - Ok((updated_fragments, removed_fragments)) + Ok((updated_fragments, removed_fragments, total_bytes_written)) +} + +/// Result of a delete operation. +#[derive(Debug, Clone)] +pub struct DeleteResult { + /// The dataset after the delete operation. + pub dataset: Arc, + /// Statistics about the write operation. + pub stats: WriteStats, } /// Builder for configuring delete operations with retry support @@ -84,10 +98,12 @@ async fn apply_deletions( /// # use lance::dataset::DeleteBuilder; /// # use std::sync::Arc; /// # async fn example(dataset: Arc) -> Result<()> { -/// let new_dataset = DeleteBuilder::new(dataset, "age > 65") +/// let result = DeleteBuilder::new(dataset, "age > 65") /// .conflict_retries(5) /// .execute() /// .await?; +/// // result.dataset is the new dataset +/// // result.stats contains write statistics /// # Ok(()) /// # } /// ``` @@ -124,7 +140,7 @@ impl DeleteBuilder { } /// Execute the delete operation - pub async fn execute(self) -> Result> { + pub async fn execute(self) -> Result { let job = DeleteJob { dataset: self.dataset.clone(), predicate: self.predicate, @@ -151,11 +167,13 @@ struct DeleteData { updated_fragments: Vec, deleted_fragment_ids: Vec, affected_rows: Option, + rows_deleted: u64, + bytes_written: u64, } impl RetryExecutor for DeleteJob { type Data = DeleteData; - type Result = Arc; + type Result = DeleteResult; async fn execute_impl(&self) -> Result { // Create a single scanner for the entire dataset @@ -166,69 +184,82 @@ impl RetryExecutor for DeleteJob { .filter(&self.predicate)?; // Check if the filter optimized to true (delete everything) or false (delete nothing) - let (updated_fragments, deleted_fragment_ids, affected_rows) = if let Some(filter_expr) = - scanner.get_expr_filter()? - { - if matches!( - filter_expr, - Expr::Literal(ScalarValue::Boolean(Some(false)), _) - ) { - // Predicate evaluated to false - no deletions - (Vec::new(), Vec::new(), Some(RowAddrTreeMap::new())) - } else if matches!( - filter_expr, - Expr::Literal(ScalarValue::Boolean(Some(true)), _) - ) { - // Predicate evaluated to true - delete all fragments - let deleted_fragment_ids = self - .dataset - .get_fragments() - .iter() - .map(|f| f.id() as u64) - .collect(); - - // When deleting everything, we don't have specific row addresses, - // so better not to emit affected rows. - (Vec::new(), deleted_fragment_ids, None) - } else { - // Regular predicate - scan and collect row addresses to delete - let stream = scanner.try_into_stream().await?.into(); - let (stream, row_id_rx) = - make_rowid_capture_stream(stream, self.dataset.manifest.uses_stable_row_ids())?; - - // Process the stream to capture row addresses - // We need to consume the stream to trigger the capture - futures::pin_mut!(stream); - while let Some(_batch) = stream.try_next().await? { - // The row addresses are captured automatically by make_rowid_capture_stream - } + let (updated_fragments, deleted_fragment_ids, affected_rows, rows_deleted, bytes_written) = + if let Some(filter_expr) = scanner.get_expr_filter()? { + if matches!( + filter_expr, + Expr::Literal(ScalarValue::Boolean(Some(false)), _) + ) { + // Predicate evaluated to false - no deletions + (Vec::new(), Vec::new(), Some(RowAddrTreeMap::new()), 0, 0) + } else if matches!( + filter_expr, + Expr::Literal(ScalarValue::Boolean(Some(true)), _) + ) { + // Predicate evaluated to true - delete all fragments + let fragments = self.dataset.get_fragments(); + let rows_deleted: u64 = + futures::future::try_join_all(fragments.iter().map(|f| f.count_rows(None))) + .await? + .into_iter() + .map(|n| n as u64) + .sum(); + let deleted_fragment_ids = fragments.iter().map(|f| f.id() as u64).collect(); + + // When deleting everything, we don't have specific row addresses, + // so better not to emit affected rows. No deletion files written. + (Vec::new(), deleted_fragment_ids, None, rows_deleted, 0) + } else { + // Regular predicate - scan and collect row addresses to delete + let stream = scanner.try_into_stream().await?.into(); + let (stream, row_id_rx) = make_rowid_capture_stream( + stream, + self.dataset.manifest.uses_stable_row_ids(), + )?; + + // Process the stream to capture row addresses + // We need to consume the stream to trigger the capture + futures::pin_mut!(stream); + while let Some(_batch) = stream.try_next().await? { + // The row addresses are captured automatically by make_rowid_capture_stream + } - // Extract the row addresses from the receiver - let removed_row_ids = row_id_rx.try_recv().map_err(|err| Error::Internal { - message: format!("Failed to receive row ids: {}", err), - location: location!(), - })?; - let row_id_index = get_row_id_index(&self.dataset).await?; - let removed_row_addrs = removed_row_ids.row_addrs(row_id_index.as_deref()); - - let (fragments, deleted_ids) = - apply_deletions(&self.dataset, &removed_row_addrs).await?; - let affected_rows = RowAddrTreeMap::from(removed_row_addrs.as_ref().clone()); - (fragments, deleted_ids, Some(affected_rows)) - } - } else { - // No filter was applied - this shouldn't happen but treat as delete nothing - (Vec::new(), Vec::new(), Some(RowAddrTreeMap::new())) - }; + // Extract the row addresses from the receiver + let removed_row_ids = row_id_rx.try_recv().map_err(|err| Error::Internal { + message: format!("Failed to receive row ids: {}", err), + location: location!(), + })?; + let row_id_index = get_row_id_index(&self.dataset).await?; + let removed_row_addrs = removed_row_ids.row_addrs(row_id_index.as_deref()); + + let rows_deleted = removed_row_addrs.len(); + let (fragments, deleted_ids, bytes_written) = + apply_deletions(&self.dataset, &removed_row_addrs).await?; + let affected_rows = RowAddrTreeMap::from(removed_row_addrs.as_ref().clone()); + ( + fragments, + deleted_ids, + Some(affected_rows), + rows_deleted, + bytes_written, + ) + } + } else { + // No filter was applied - this shouldn't happen but treat as delete nothing + (Vec::new(), Vec::new(), Some(RowAddrTreeMap::new()), 0, 0) + }; Ok(DeleteData { updated_fragments, deleted_fragment_ids, affected_rows, + rows_deleted, + bytes_written, }) } async fn commit(&self, dataset: Arc, data: Self::Data) -> Result { + let num_updated_fragments = data.updated_fragments.len() as u64; let operation = Operation::Delete { updated_fragments: data.updated_fragments, deleted_fragment_ids: data.deleted_fragment_ids, @@ -242,7 +273,19 @@ impl RetryExecutor for DeleteJob { builder = builder.with_affected_rows(affected_rows); } - builder.execute(transaction).await.map(Arc::new) + let new_dataset = builder.execute(transaction).await?; + + let stats = WriteStats { + rows_deleted: data.rows_deleted, + files_written: num_updated_fragments, + bytes_written: data.bytes_written, + ..Default::default() + }; + + Ok(DeleteResult { + dataset: Arc::new(new_dataset), + stats, + }) } fn update_dataset(&mut self, dataset: Arc) { @@ -254,10 +297,10 @@ impl RetryExecutor for DeleteJob { pub async fn delete(ds: &mut Dataset, predicate: &str) -> Result<()> { // Use DeleteBuilder with 0 retries to maintain backwards compatibility let dataset = Arc::new(ds.clone()); - let new_dataset = DeleteBuilder::new(dataset, predicate).execute().await?; + let result = DeleteBuilder::new(dataset, predicate).execute().await?; // Update the dataset in place - *ds = Arc::try_unwrap(new_dataset).unwrap_or_else(|arc| (*arc).clone()); + *ds = Arc::try_unwrap(result.dataset).unwrap_or_else(|arc| (*arc).clone()); Ok(()) } @@ -431,7 +474,8 @@ mod tests { .with_params(&write_params) .execute(vec![data]) .await - .unwrap(); + .unwrap() + .dataset; dataset.validate().await.unwrap(); @@ -618,7 +662,8 @@ mod tests { } // Get the final dataset from any successful result - let final_dataset = results.into_iter().find_map(|r| r.ok()).unwrap(); + let final_result = results.into_iter().find_map(|r| r.ok()).unwrap(); + let final_dataset = final_result.dataset; // Rows 0-49 should be deleted, rows 50-99 should remain assert_eq!(final_dataset.count_rows(None).await.unwrap(), 50); @@ -697,7 +742,8 @@ mod tests { }) .execute(vec![initial_data]) .await - .unwrap(); + .unwrap() + .dataset; let barrier = Arc::new(Barrier::new(concurrency as usize)); let mut handles = Vec::new(); @@ -784,7 +830,8 @@ mod tests { }) .execute(vec![batch]) .await - .unwrap(); + .unwrap() + .dataset; // Verify we have 2 fragments initially assert_eq!(dataset.get_fragments().len(), 2); @@ -829,12 +876,65 @@ mod tests { ); // Also verify with the retry mechanism that it works correctly - let final_dataset = DeleteBuilder::new(dataset_arc, "true") + let final_result = DeleteBuilder::new(dataset_arc, "true") .conflict_retries(5) .execute() .await .unwrap(); // All rows should be deleted, including the updated ones - assert_eq!(final_dataset.count_rows(None).await.unwrap(), 0); + assert_eq!(final_result.dataset.count_rows(None).await.unwrap(), 0); + } + + #[tokio::test] + async fn test_delete_stats() { + fn sequence_data(range: Range) -> RecordBatch { + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::UInt32, + false, + )])); + RecordBatch::try_new(schema, vec![Arc::new(UInt32Array::from_iter_values(range))]) + .unwrap() + } + + // Write a dataset with 2 fragments + let data = sequence_data(0..100); + let batches = vec![data.slice(0, 50), data.slice(50, 50)]; + let write_params = WriteParams { + max_rows_per_file: 50, + ..Default::default() + }; + let dataset = InsertBuilder::new("memory://test_delete_stats") + .with_params(&write_params) + .execute(batches) + .await + .unwrap() + .dataset; + + assert_eq!(dataset.get_fragments().len(), 2); + + // Delete rows that span both fragments + let result = DeleteBuilder::new(Arc::new(dataset), "i < 10 OR i >= 90") + .execute() + .await + .unwrap(); + + // Verify stats + assert_eq!(result.stats.rows_deleted, 20); // 10 from each fragment + assert_eq!(result.stats.files_written, 2); // One deletion file per fragment + assert!(result.stats.bytes_written > 0); // Deletion files have non-zero size + assert_eq!(result.stats.rows_written, 0); + assert_eq!(result.stats.rows_updated, 0); + + // Delete from single fragment + let result2 = DeleteBuilder::new(result.dataset, "i < 20") + .execute() + .await + .unwrap(); + + // Only first fragment has new deletions (10-19), second fragment unchanged + assert_eq!(result2.stats.rows_deleted, 10); + assert_eq!(result2.stats.files_written, 1); + assert!(result2.stats.bytes_written > 0); } } diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index d1ee6db4145..2c14c4b5d72 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -32,6 +32,17 @@ use super::resolve_commit_handler; use super::WriteDestination; use super::WriteMode; use super::WriteParams; +use super::WriteStats; + +/// Result of an insert operation. +#[derive(Debug, Clone)] +pub struct InsertResult { + /// The dataset after the insert. + pub dataset: Dataset, + /// Statistics about the write operation. + pub stats: WriteStats, +} + /// Insert or create a new dataset. /// /// There are different variants of `execute()` methods. Those with the `_stream` @@ -65,15 +76,16 @@ impl<'a> InsertBuilder<'a> { /// Execute the insert operation with the given data. /// /// This writes the data fragments and commits them into the dataset. - pub async fn execute(&self, data: Vec) -> Result { - let (transaction, context) = self.write_uncommitted_impl(data).await?; - Self::do_commit(&context, transaction).await + pub async fn execute(&self, data: Vec) -> Result { + let (transaction, context, stats) = self.write_uncommitted_impl(data).await?; + let dataset = Self::do_commit(&context, transaction).await?; + Ok(InsertResult { dataset, stats }) } /// Execute the insert operation with the given stream. /// /// This writes the data fragments and commits them into the dataset. - pub async fn execute_stream(&self, source: impl StreamingWriteSource) -> Result { + pub async fn execute_stream(&self, source: impl StreamingWriteSource) -> Result { let (stream, schema) = source.into_stream_and_schema().await?; self.execute_stream_impl(stream, schema).await } @@ -82,9 +94,11 @@ impl<'a> InsertBuilder<'a> { &self, stream: SendableRecordBatchStream, schema: Schema, - ) -> Result { - let (transaction, context) = self.write_uncommitted_stream_impl(stream, schema).await?; - Self::do_commit(&context, transaction).await + ) -> Result { + let (transaction, context, stats) = + self.write_uncommitted_stream_impl(stream, schema).await?; + let dataset = Self::do_commit(&context, transaction).await?; + Ok(InsertResult { dataset, stats }) } /// Write data files, but don't commit the transaction yet. @@ -112,7 +126,7 @@ impl<'a> InsertBuilder<'a> { /// # } /// ``` pub async fn execute_uncommitted(&self, data: Vec) -> Result { - self.write_uncommitted_impl(data).await.map(|(t, _)| t) + self.write_uncommitted_impl(data).await.map(|(t, _, _)| t) } async fn do_commit(context: &WriteContext<'_>, transaction: Transaction) -> Result { @@ -138,7 +152,7 @@ impl<'a> InsertBuilder<'a> { async fn write_uncommitted_impl( &self, data: Vec, - ) -> Result<(Transaction, WriteContext<'_>)> { + ) -> Result<(Transaction, WriteContext<'_>, WriteStats)> { // TODO: This should be able to split the data up based on max_rows_per_file // and write in parallel. https://github.com/lance-format/lance/issues/1980 if data.is_empty() { @@ -169,7 +183,7 @@ impl<'a> InsertBuilder<'a> { source: impl StreamingWriteSource, ) -> Result { let (stream, schema) = source.into_stream_and_schema().await?; - let (transaction, _) = self.write_uncommitted_stream_impl(stream, schema).await?; + let (transaction, _, _) = self.write_uncommitted_stream_impl(stream, schema).await?; Ok(transaction) } @@ -177,7 +191,7 @@ impl<'a> InsertBuilder<'a> { &self, stream: SendableRecordBatchStream, schema: Schema, - ) -> Result<(Transaction, WriteContext<'_>)> { + ) -> Result<(Transaction, WriteContext<'_>, WriteStats)> { let mut context = self.resolve_context().await?; info!( @@ -204,9 +218,10 @@ impl<'a> InsertBuilder<'a> { ) .await?; + let stats = WriteStats::from_fragments(&written_fragments); let transaction = Self::build_transaction(schema, written_fragments, &context)?; - Ok((transaction, context)) + Ok((transaction, context, stats)) } fn build_transaction( @@ -444,7 +459,7 @@ mod test { #[tokio::test] async fn test_pass_session() { let session = Arc::new(Session::new(0, 0, Default::default())); - let dataset = InsertBuilder::new("memory://") + let result = InsertBuilder::new("memory://") .with_params(&WriteParams { session: Some(session.clone()), ..Default::default() @@ -456,7 +471,10 @@ mod test { .await .unwrap(); - assert_eq!(Arc::as_ptr(&dataset.session()), Arc::as_ptr(&session)); + assert_eq!( + Arc::as_ptr(&result.dataset.session()), + Arc::as_ptr(&session) + ); } #[tokio::test] @@ -473,13 +491,14 @@ mod test { vec![Arc::new(StructArray::new_empty_fields(1, None))], ) .unwrap(); - let dataset = InsertBuilder::new("memory://") + let result = InsertBuilder::new("memory://") .execute_stream(RecordBatchIterator::new(vec![Ok(batch)], schema.clone())) .await .unwrap(); assert_eq!( - dataset + result + .dataset .count_rows(Some("empties IS NOT NULL".to_string())) .await .unwrap(), @@ -493,7 +512,7 @@ mod test { let batch = RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1]))]) .unwrap(); - let dataset = InsertBuilder::new("memory://blob-version-guard") + let result = InsertBuilder::new("memory://blob-version-guard") .execute_stream(RecordBatchIterator::new( vec![Ok(batch.clone())], schema.clone(), @@ -501,7 +520,7 @@ mod test { .await .unwrap(); - let dataset = Arc::new(dataset); + let dataset = Arc::new(result.dataset); let params = WriteParams { mode: WriteMode::Overwrite, data_storage_version: Some(LanceFileVersion::V2_2), @@ -515,4 +534,26 @@ mod test { assert!(matches!(result, Err(Error::InvalidInput { .. }))); } + + #[tokio::test] + async fn test_insert_stats() { + let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from_iter_values(0..100))], + ) + .unwrap(); + + let result = InsertBuilder::new("memory://test_insert_stats") + .execute(vec![batch]) + .await + .unwrap(); + + // Verify stats + assert_eq!(result.stats.rows_written, 100); + assert_eq!(result.stats.files_written, 1); + assert!(result.stats.bytes_written > 0); + assert_eq!(result.stats.rows_updated, 0); + assert_eq!(result.stats.rows_deleted, 0); + } } diff --git a/rust/lance/src/dataset/write/merge_insert.rs b/rust/lance/src/dataset/write/merge_insert.rs index 02af35fb519..3c2738fd663 100644 --- a/rust/lance/src/dataset/write/merge_insert.rs +++ b/rust/lance/src/dataset/write/merge_insert.rs @@ -25,7 +25,7 @@ use assign_action::merge_insert_action; use inserted_rows::KeyExistenceFilter; use super::retry::{execute_with_retry, RetryConfig, RetryExecutor}; -use super::{write_fragments_internal, CommitBuilder, WriteParams}; +use super::{write_fragments_internal, CommitBuilder, WriteParams, WriteStats}; use crate::dataset::rowids::get_row_id_index; use crate::dataset::transaction::UpdateMode::{RewriteColumns, RewriteRows}; use crate::dataset::utils::CapturedRowIds; @@ -1713,10 +1713,10 @@ impl MergeInsertJob { let fragment_id = fragment.id(); if let Some(bitmap) = bitmaps_ref.get(&(fragment_id as u32)) { match fragment.extend_deletions(*bitmap).await { - Ok(Some(new_fragment)) => { + Ok((Some(new_fragment), _)) => { Ok(FragmentChange::Modified(Box::new(new_fragment.metadata))) } - Ok(None) => Ok(FragmentChange::Removed(fragment_id as u64)), + Ok((None, _)) => Ok(FragmentChange::Removed(fragment_id as u64)), Err(e) => Err(e), } } else { @@ -1855,6 +1855,19 @@ pub struct MergeStats { pub num_skipped_duplicates: u64, } +impl MergeStats { + /// Returns write statistics in the common WriteStats format. + pub fn write_stats(&self) -> WriteStats { + WriteStats { + rows_written: self.num_inserted_rows, + rows_updated: self.num_updated_rows, + rows_deleted: self.num_deleted_rows, + files_written: self.num_files_written, + bytes_written: self.bytes_written, + } + } +} + pub struct UncommittedMergeInsert { pub transaction: Transaction, pub affected_rows: Option, @@ -2335,6 +2348,14 @@ mod tests { assert_eq!(merge_stats.num_updated_rows, stats[1]); assert_eq!(merge_stats.num_deleted_rows, stats[2]); + // Verify write_stats() accessor returns consistent values + let write_stats = merge_stats.write_stats(); + assert_eq!(write_stats.rows_written, merge_stats.num_inserted_rows); + assert_eq!(write_stats.rows_updated, merge_stats.num_updated_rows); + assert_eq!(write_stats.rows_deleted, merge_stats.num_deleted_rows); + assert_eq!(write_stats.files_written, merge_stats.num_files_written); + assert_eq!(write_stats.bytes_written, merge_stats.bytes_written); + merged_dataset } @@ -3553,7 +3574,8 @@ mod tests { }) .execute(vec![initial_data]) .await - .unwrap(); + .unwrap() + .dataset; // do merge inserts in parallel based on the concurrency. Each will open the dataset, // signal they have opened, and then wait for a signal to proceed. Once the signal @@ -3681,7 +3703,8 @@ mod tests { }) .execute(vec![initial_data]) .await - .unwrap(); + .unwrap() + .dataset; let dataset = Arc::new(dataset); // Start one merge insert, but don't commit it yet. @@ -4006,7 +4029,8 @@ mod tests { }) .execute(vec![initial_data]) .await - .unwrap(); + .unwrap() + .dataset; // Each merge insert will update one row. Combined, they should delete // all rows in the first fragment, and it should be dropped. @@ -4360,7 +4384,8 @@ mod tests { let dataset = InsertBuilder::new("memory://") .execute(vec![initial]) .await - .unwrap(); + .unwrap() + .dataset; let dataset = Arc::new(dataset); // Source with overlapping key 1 @@ -4441,7 +4466,8 @@ mod tests { let dataset = InsertBuilder::new("memory://") .execute(vec![initial]) .await - .unwrap(); + .unwrap() + .dataset; let dataset = Arc::new(dataset); // Both jobs update/insert the same key 2 @@ -4530,7 +4556,8 @@ mod tests { let dataset = InsertBuilder::new("memory://") .execute(vec![initial]) .await - .unwrap(); + .unwrap() + .dataset; let dataset = Arc::new(dataset); // Both jobs try to INSERT the same NEW key id=100 (doesn't exist in initial data) @@ -4620,7 +4647,8 @@ mod tests { let dataset = InsertBuilder::new("memory://") .execute(vec![initial]) .await - .unwrap(); + .unwrap() + .dataset; let dataset = Arc::new(dataset); // Create merge insert job based on version 1 @@ -4701,7 +4729,8 @@ mod tests { let dataset = InsertBuilder::new("memory://") .execute(vec![initial]) .await - .unwrap(); + .unwrap() + .dataset; let dataset = Arc::new(dataset); // Create merge insert job based on version 1 diff --git a/rust/lance/src/dataset/write/merge_insert/exec.rs b/rust/lance/src/dataset/write/merge_insert/exec.rs index 473051da181..8b0f6ba7a19 100644 --- a/rust/lance/src/dataset/write/merge_insert/exec.rs +++ b/rust/lance/src/dataset/write/merge_insert/exec.rs @@ -83,10 +83,10 @@ pub(super) async fn apply_deletions( let fragment_id = fragment.id(); if let Some(bitmap) = bitmaps_ref.get(&(fragment_id as u32)) { match fragment.extend_deletions(*bitmap).await { - Ok(Some(new_fragment)) => { + Ok((Some(new_fragment), _)) => { Ok(FragmentChange::Modified(Box::new(new_fragment.metadata))) } - Ok(None) => Ok(FragmentChange::Removed(fragment_id as u64)), + Ok((None, _)) => Ok(FragmentChange::Removed(fragment_id as u64)), Err(e) => Err(e), } } else { diff --git a/rust/lance/src/dataset/write/update.rs b/rust/lance/src/dataset/write/update.rs index d2cac743f92..ad435e6cb20 100644 --- a/rust/lance/src/dataset/write/update.rs +++ b/rust/lance/src/dataset/write/update.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use std::time::Duration; use super::retry::{execute_with_retry, RetryConfig, RetryExecutor}; -use super::{write_fragments_internal, CommitBuilder, WriteParams}; +use super::{write_fragments_internal, CommitBuilder, WriteParams, WriteStats}; use crate::dataset::rowids::get_row_id_index; use crate::dataset::transaction::UpdateMode::RewriteRows; use crate::dataset::transaction::{Operation, Transaction}; @@ -234,6 +234,8 @@ impl UpdateBuilder { pub struct UpdateResult { pub new_dataset: Arc, pub rows_updated: u64, + /// Statistics about the write operation. + pub stats: WriteStats, } #[derive(Debug)] @@ -243,6 +245,7 @@ pub struct UpdateData { new_fragments: Vec, affected_rows: RowAddrTreeMap, num_updated_rows: u64, + stats: WriteStats, } #[derive(Debug, Clone)] @@ -358,12 +361,18 @@ impl UpdateJob { .map(|f| f.physical_rows.unwrap() as u64) .sum::(); + let mut stats = WriteStats::from_fragments(&new_fragments); + stats.rows_updated = num_updated_rows; + // Update uses rewrite rows mode, so rows_written = rows_updated + stats.rows_written = 0; + Ok(UpdateData { removed_fragment_ids, old_fragments, new_fragments, affected_rows, num_updated_rows, + stats, }) } @@ -404,6 +413,7 @@ impl UpdateJob { Ok(UpdateResult { new_dataset: Arc::new(new_dataset), rows_updated: update_data.num_updated_rows, + stats: update_data.stats, }) } @@ -443,10 +453,10 @@ impl UpdateJob { let fragment_id = fragment.id(); if let Some(bitmap) = bitmaps_ref.get(&(fragment_id as u32)) { match fragment.extend_deletions(*bitmap).await { - Ok(Some(new_fragment)) => { + Ok((Some(new_fragment), _)) => { Ok(FragmentChange::Modified(Box::new(new_fragment.metadata))) } - Ok(None) => Ok(FragmentChange::Removed(fragment_id as u64)), + Ok((None, _)) => Ok(FragmentChange::Removed(fragment_id as u64)), Err(e) => Err(e), } } else { @@ -614,6 +624,14 @@ mod tests { .await .unwrap(); + // Verify stats + assert_eq!(update_result.rows_updated, 30); + assert_eq!(update_result.stats.rows_updated, 30); + assert_eq!(update_result.stats.files_written, 1); // One new fragment + assert!(update_result.stats.bytes_written > 0); + assert_eq!(update_result.stats.rows_written, 0); // Update rewrites, not insert + assert_eq!(update_result.stats.rows_deleted, 0); + let dataset = update_result.new_dataset; let actual_batches = dataset .scan() @@ -764,7 +782,8 @@ mod tests { }) .execute(vec![initial_data]) .await - .unwrap(); + .unwrap() + .dataset; let barrier = Arc::new(Barrier::new(concurrency as usize)); let mut handles = Vec::new(); @@ -858,7 +877,8 @@ mod tests { }) .execute(vec![initial_data]) .await - .unwrap(); + .unwrap() + .dataset; let barrier = Arc::new(Barrier::new(concurrency as usize)); let mut handles = Vec::new(); @@ -1264,7 +1284,8 @@ mod tests { }) .execute(vec![new_batch]) .await - .unwrap(); + .unwrap() + .dataset; assert_eq!(dataset.get_fragments().len(), 3); diff --git a/rust/lance/src/index/vector/ivf.rs b/rust/lance/src/index/vector/ivf.rs index 3f7d5f10a2a..6ce248ed7a5 100644 --- a/rust/lance/src/index/vector/ivf.rs +++ b/rust/lance/src/index/vector/ivf.rs @@ -2846,7 +2846,8 @@ mod tests { let mut dataset = InsertBuilder::new("memory://") .execute(vec![data]) .await - .unwrap(); + .unwrap() + .dataset; // Create index dataset @@ -2887,7 +2888,8 @@ mod tests { let mut dataset = InsertBuilder::new("memory://") .execute(vec![data]) .await - .unwrap(); + .unwrap() + .dataset; // Create index let index_params = VectorIndexParams::with_ivf_pq_params( @@ -2931,7 +2933,8 @@ mod tests { }) .execute(vec![data]) .await - .unwrap(); + .unwrap() + .dataset; check_index(&dataset, num_non_null, dims).await; // Optimize the index diff --git a/rust/lance/src/index/vector/ivf/v2.rs b/rust/lance/src/index/vector/ivf/v2.rs index 0e85378ab97..4c6805f4a6e 100644 --- a/rust/lance/src/index/vector/ivf/v2.rs +++ b/rust/lance/src/index/vector/ivf/v2.rs @@ -2417,7 +2417,8 @@ mod tests { }) .execute(vec![new_data]) .await - .unwrap(); + .unwrap() + .dataset; dataset .optimize_indices(&OptimizeOptions::merge(1)) .await diff --git a/rust/lance/src/index/vector/utils.rs b/rust/lance/src/index/vector/utils.rs index 3358f9093d5..f1b15e3f60b 100644 --- a/rust/lance/src/index/vector/utils.rs +++ b/rust/lance/src/index/vector/utils.rs @@ -540,7 +540,8 @@ mod tests { let dataset = InsertBuilder::new("memory://") .execute(vec![data]) .await - .unwrap(); + .unwrap() + .dataset; let training_data = maybe_sample_training_data(&dataset, "mv", 1000) .await @@ -568,7 +569,8 @@ mod tests { let dataset = InsertBuilder::new("memory://") .execute(vec![data]) .await - .unwrap(); + .unwrap() + .dataset; let n = estimate_multivector_vectors_per_row(&dataset, "mv", nrows) .await diff --git a/rust/lance/src/io/commit/conflict_resolver.rs b/rust/lance/src/io/commit/conflict_resolver.rs index 0fd52ab2a8b..a018702eedc 100644 --- a/rust/lance/src/io/commit/conflict_resolver.rs +++ b/rust/lance/src/io/commit/conflict_resolver.rs @@ -1516,7 +1516,7 @@ impl<'a> TransactionRebase<'a> { } } - let new_deletion_file = write_deletion_file( + let (new_deletion_file, _bytes_written) = write_deletion_file( &dataset.base, *fragment_id, dataset.manifest.version, @@ -1798,12 +1798,12 @@ mod tests { ], ) .unwrap(); - let dataset = InsertBuilder::new("memory://") + let result = InsertBuilder::new("memory://") .with_params(&write_params) .execute(vec![data]) .await .unwrap(); - dataset + result.dataset } /// Helper function for tests to create UpdateConfig operations using old-style parameters @@ -1944,7 +1944,7 @@ mod tests { current_deletions.extend(delete_rows.iter().copied()); - fragment.deletion_file = write_deletion_file( + let (deletion_file, _bytes_written) = write_deletion_file( &dataset.base, fragment.id, dataset.manifest.version, @@ -1953,6 +1953,7 @@ mod tests { ) .await .unwrap(); + fragment.deletion_file = deletion_file; let deletion_file = fragment.deletion_file.as_ref().unwrap(); let key = DeletionFileKey { diff --git a/rust/lance/src/io/commit/s3_test.rs b/rust/lance/src/io/commit/s3_test.rs index 35e64703688..6770da7a377 100644 --- a/rust/lance/src/io/commit/s3_test.rs +++ b/rust/lance/src/io/commit/s3_test.rs @@ -330,7 +330,8 @@ async fn test_ddb_open_iops() { }) .execute(vec![data.clone()]) .await - .unwrap(); + .unwrap() + .dataset; let io_stats = dataset.object_store().io_stats_incremental(); // Append: 5 IOPS: data file, transaction file, 3x manifest file assert_io_eq!(io_stats, write_iops, 5); diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index 0a32da7813a..76e33aa6b12 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -1961,7 +1961,8 @@ mod tests { }) .execute(new_data) .await - .unwrap(); + .unwrap() + .dataset; dataset .optimize_indices(&OptimizeOptions::new().index_names(vec![