Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions rust/lance-table/src/io/deletion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,17 @@ pub fn relative_deletion_file_path(fragment_id: u64, deletion_file: &DeletionFil

/// Write a deletion file for a fragment for a given deletion vector.
///
/// Returns the deletion file if one was written. If no deletions were present,
/// returns `Ok(None)`.
/// Returns a tuple of (deletion_file, bytes_written). If no deletions were present,
/// returns `Ok((None, 0))`.
pub async fn write_deletion_file(
base: &Path,
fragment_id: u64,
read_version: u64,
removed_rows: &DeletionVector,
object_store: &ObjectStore,
) -> Result<Option<DeletionFile>> {
let deletion_file = match removed_rows {
DeletionVector::NoDeletions => None,
) -> Result<(Option<DeletionFile>, u64)> {
let (deletion_file, bytes_written) = match removed_rows {
DeletionVector::NoDeletions => (None, 0),
DeletionVector::Set(set) => {
let id = rand::rng().random::<u64>();
let deletion_file = DeletionFile {
Expand Down Expand Up @@ -101,11 +101,12 @@ pub async fn write_deletion_file(
// Drop writer so out is no longer borrowed.
}

let bytes_written = out.len() as u64;
object_store.put(&path, &out).await?;

info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DELETION, path = path.to_string());

Some(deletion_file)
(Some(deletion_file), bytes_written)
}
DeletionVector::Bitmap(bitmap) => {
let id = rand::rng().random::<u64>();
Expand All @@ -121,14 +122,15 @@ pub async fn write_deletion_file(
let mut out: Vec<u8> = Vec::new();
bitmap.serialize_into(&mut out)?;

let bytes_written = out.len() as u64;
object_store.put(&path, &out).await?;

info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_DELETION, path = path.to_string());

Some(deletion_file)
(Some(deletion_file), bytes_written)
}
};
Ok(deletion_file)
Ok((deletion_file, bytes_written))
}

#[instrument(
Expand Down Expand Up @@ -235,10 +237,11 @@ mod test {
let (object_store, path) = ObjectStore::from_uri("memory:///no_deletion")
.await
.unwrap();
let file = write_deletion_file(&path, 0, 0, &dv, &object_store)
let (file, bytes_written) = write_deletion_file(&path, 0, 0, &dv, &object_store)
.await
.unwrap();
assert!(file.is_none());
assert_eq!(bytes_written, 0);
}

#[tokio::test]
Expand All @@ -250,9 +253,10 @@ mod test {

let object_store = ObjectStore::memory();
let path = Path::from("/write");
let file = write_deletion_file(&path, fragment_id, read_version, &dv, &object_store)
.await
.unwrap();
let (file, bytes_written) =
write_deletion_file(&path, fragment_id, read_version, &dv, &object_store)
.await
.unwrap();

assert!(matches!(
file,
Expand All @@ -261,6 +265,7 @@ mod test {
..
})
));
assert!(bytes_written > 0);

let file = file.unwrap();
assert_eq!(file.read_version, read_version);
Expand Down Expand Up @@ -304,9 +309,10 @@ mod test {

let object_store = ObjectStore::memory();
let path = Path::from("/bitmap");
let file = write_deletion_file(&path, fragment_id, read_version, &dv, &object_store)
.await
.unwrap();
let (file, bytes_written) =
write_deletion_file(&path, fragment_id, read_version, &dv, &object_store)
.await
.unwrap();

assert!(matches!(
file,
Expand All @@ -315,6 +321,7 @@ mod test {
..
})
));
assert!(bytes_written > 0);

let file = file.unwrap();
assert_eq!(file.read_version, read_version);
Expand Down Expand Up @@ -346,7 +353,7 @@ mod test {

let object_store = ObjectStore::memory();
let path = Path::from("/roundtrip");
let file = write_deletion_file(&path, fragment_id, read_version, &dv, &object_store)
let (file, _) = write_deletion_file(&path, fragment_id, read_version, &dv, &object_store)
.await
.unwrap();

Expand All @@ -365,7 +372,7 @@ mod test {

let object_store = ObjectStore::memory();
let path = Path::from("/bitmap");
let file = write_deletion_file(&path, fragment_id, read_version, &dv, &object_store)
let (file, _) = write_deletion_file(&path, fragment_id, read_version, &dv, &object_store)
.await
.unwrap();

Expand Down
17 changes: 10 additions & 7 deletions rust/lance/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ pub use write::merge_insert::{
WhenNotMatched, WhenNotMatchedBySource,
};

pub use write::update::{UpdateBuilder, UpdateJob};
pub use write::update::{UpdateBuilder, UpdateJob, UpdateResult};
#[allow(deprecated)]
pub use write::{
write_fragments, AutoCleanupParams, CommitBuilder, DeleteBuilder, InsertBuilder,
WriteDestination, WriteMode, WriteParams,
write_fragments, AutoCleanupParams, CommitBuilder, DeleteBuilder, DeleteResult, InsertBuilder,
InsertResult, WriteDestination, WriteMode, WriteParams, WriteStats,
};

pub(crate) const INDICES_DIR: &str = "_indices";
Expand Down Expand Up @@ -793,8 +793,11 @@ impl Dataset {
if let Some(params) = &params {
builder = builder.with_params(params);
}
Box::pin(builder.execute_stream(Box::new(batches) as Box<dyn RecordBatchReader + Send>))
.await
let result = Box::pin(
builder.execute_stream(Box::new(batches) as Box<dyn RecordBatchReader + Send>),
)
.await?;
Ok(result.dataset)
}

/// Write into a namespace-managed table with automatic credential vending.
Expand Down Expand Up @@ -969,12 +972,12 @@ impl Dataset {
..params.unwrap_or_default()
};

let new_dataset = InsertBuilder::new(WriteDestination::Dataset(Arc::new(self.clone())))
let result = InsertBuilder::new(WriteDestination::Dataset(Arc::new(self.clone())))
.with_params(&write_params)
.execute_stream(Box::new(batches) as Box<dyn RecordBatchReader + Send>)
.await?;

*self = new_dataset;
*self = result.dataset;

Ok(())
}
Expand Down
24 changes: 17 additions & 7 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1792,13 +1792,18 @@ impl FileFragment {
return Ok(Some(self));
}

self.write_deletions(deletion_vector).await
let (fragment, _bytes_written) = self.write_deletions(deletion_vector).await?;
Ok(fragment)
}

/// Extend the deletion vector with new deletions.
///
/// Returns `(Some(fragment), bytes_written)` if the fragment still has rows,
/// or `(None, 0)` if all rows are deleted.
pub async fn extend_deletions(
self,
new_deletions: impl IntoIterator<Item = u32>,
) -> Result<Option<Self>> {
) -> Result<(Option<Self>, u64)> {
let mut deletion_vector = self
.get_deletion_vector()
.await?
Expand All @@ -1811,12 +1816,15 @@ impl FileFragment {
self.write_deletions(deletion_vector).await
}

async fn write_deletions(mut self, deletion_vector: DeletionVector) -> Result<Option<Self>> {
async fn write_deletions(
mut self,
deletion_vector: DeletionVector,
) -> Result<(Option<Self>, u64)> {
let physical_rows = self.physical_rows().await?;
if deletion_vector.len() == physical_rows
&& deletion_vector.contains_range(0..physical_rows as u32)
{
return Ok(None);
return Ok((None, 0));
} else if deletion_vector.len() >= physical_rows {
let dv_len = deletion_vector.len();
let examples: Vec<u32> = deletion_vector
Expand All @@ -1835,16 +1843,17 @@ impl FileFragment {
});
}

self.metadata.deletion_file = write_deletion_file(
let (deletion_file, bytes_written) = write_deletion_file(
&self.dataset.base,
self.metadata.id,
self.dataset.version().version,
&deletion_vector,
self.dataset.object_store(),
)
.await?;
self.metadata.deletion_file = deletion_file;

Ok(Some(self))
Ok((Some(self), bytes_written))
}
}

Expand Down Expand Up @@ -3924,7 +3933,8 @@ mod tests {
.with_params(&write_params)
.execute(vec![batch])
.await
.unwrap();
.unwrap()
.dataset;
let fragment = dataset.get_fragments().pop().unwrap();

// Assert file is small (< 4300 bytes)
Expand Down
15 changes: 10 additions & 5 deletions rust/lance/src/dataset/tests/dataset_merge_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1058,14 +1058,16 @@ async fn test_replace_dataset() {
let mut ds = InsertBuilder::new(&test_uri)
.execute(vec![data1])
.await
.unwrap();
.unwrap()
.dataset;

ds.object_store().remove_dir_all(test_path).await.unwrap();

let ds2 = InsertBuilder::new(&test_uri)
.execute(vec![data2.clone()])
.await
.unwrap();
.unwrap()
.dataset;

ds.checkout_latest().await.unwrap();
let roundtripped = ds.scan().try_into_batch().await.unwrap();
Expand Down Expand Up @@ -1121,7 +1123,8 @@ async fn test_insert_skip_auto_cleanup() {
.with_params(&write_params1)
.execute_stream(data1)
.await
.unwrap();
.unwrap()
.dataset;

assert_eq!(dataset2.version().version, 2);

Expand All @@ -1137,7 +1140,8 @@ async fn test_insert_skip_auto_cleanup() {
.with_params(&write_params1)
.execute_stream(data1_extra)
.await
.unwrap();
.unwrap()
.dataset;

assert_eq!(dataset2_extra.version().version, 3);

Expand Down Expand Up @@ -1170,7 +1174,8 @@ async fn test_insert_skip_auto_cleanup() {
.with_params(&write_params2)
.execute_stream(data2)
.await
.unwrap();
.unwrap()
.dataset;

assert_eq!(dataset3.version().version, 4);

Expand Down
9 changes: 6 additions & 3 deletions rust/lance/src/dataset/tests/dataset_transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ async fn test_session_store_registry() {
.with_params(&write_params)
.execute(vec![batch.clone()])
.await
.unwrap();
.unwrap()
.dataset;

// Assert there is one active store.
assert_eq!(registry.active_stores().len(), 1);
Expand All @@ -172,7 +173,8 @@ async fn test_session_store_registry() {
.with_params(&write_params)
.execute(vec![batch.clone()])
.await
.unwrap();
.unwrap()
.dataset;
assert_eq!(registry.active_stores().len(), 1);
assert_eq!(
Arc::as_ptr(&dataset.object_store().inner),
Expand All @@ -192,7 +194,8 @@ async fn test_session_store_registry() {
.with_params(&write_params2)
.execute(vec![batch.clone()])
.await
.unwrap();
.unwrap()
.dataset;
assert_eq!(registry.active_stores().len(), 2);
assert_ne!(
Arc::as_ptr(&dataset.object_store().inner),
Expand Down
36 changes: 34 additions & 2 deletions rust/lance/src/dataset/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,40 @@ mod retry;
pub mod update;

pub use commit::CommitBuilder;
pub use delete::DeleteBuilder;
pub use insert::InsertBuilder;
pub use delete::{DeleteBuilder, DeleteResult};
pub use insert::{InsertBuilder, InsertResult};

/// Statistics for write operations.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct WriteStats {
/// Number of rows written (inserted).
pub rows_written: u64,
/// Number of rows updated.
pub rows_updated: u64,
/// Number of rows deleted.
pub rows_deleted: u64,
/// Number of files written.
pub files_written: u64,
/// Total bytes written to storage.
pub bytes_written: u64,
}

impl WriteStats {
/// Calculate write stats from newly written fragments.
pub(crate) fn from_fragments(fragments: &[Fragment]) -> Self {
let mut stats = Self::default();
for fragment in fragments {
stats.rows_written += fragment.physical_rows.unwrap_or(0) as u64;
for data_file in &fragment.files {
if let Some(size) = data_file.file_size_bytes.get() {
stats.bytes_written += u64::from(size);
}
stats.files_written += 1;
}
}
stats
}
}

/// The destination to write data to.
#[derive(Debug, Clone)]
Expand Down
Loading
Loading