Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::ops::Range;
use std::sync::{Arc, OnceLock};

use bytes::Bytes;
use futures::{Stream, StreamExt};

use super::storage::{
LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory,
Expand Down Expand Up @@ -140,6 +141,18 @@ impl FileIO {
self.get_storage()?.delete_prefix(path.as_ref()).await
}

/// Delete multiple files from a stream of paths.
///
/// # Arguments
///
/// * paths: A stream of absolute paths starting with the scheme string used to construct [`FileIO`].
pub async fn delete_stream(
&self,
paths: impl Stream<Item = String> + Send + 'static,
) -> Result<()> {
self.get_storage()?.delete_stream(paths.boxed()).await
}

/// Check file exists.
///
/// # Arguments
Expand Down
66 changes: 66 additions & 0 deletions crates/iceberg/src/io/storage/local_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};

use crate::io::{
Expand Down Expand Up @@ -200,6 +202,13 @@ impl Storage for LocalFsStorage {
Ok(())
}

async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
while let Some(path) = paths.next().await {
self.delete(&path).await?;
}
Ok(())
}

fn new_input(&self, path: &str) -> Result<InputFile> {
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
}
Expand Down Expand Up @@ -534,4 +543,61 @@ mod tests {

assert!(path.exists());
}

#[tokio::test]
async fn test_local_fs_storage_delete_stream() {
use futures::stream;

let tmp_dir = TempDir::new().unwrap();
let storage = LocalFsStorage::new();

// Create multiple files
let file1 = tmp_dir.path().join("file1.txt");
let file2 = tmp_dir.path().join("file2.txt");
let file3 = tmp_dir.path().join("file3.txt");

storage
.write(file1.to_str().unwrap(), Bytes::from("1"))
.await
.unwrap();
storage
.write(file2.to_str().unwrap(), Bytes::from("2"))
.await
.unwrap();
storage
.write(file3.to_str().unwrap(), Bytes::from("3"))
.await
.unwrap();

// Verify files exist
assert!(storage.exists(file1.to_str().unwrap()).await.unwrap());
assert!(storage.exists(file2.to_str().unwrap()).await.unwrap());
assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());

// Delete multiple files using stream
let paths = vec![
file1.to_str().unwrap().to_string(),
file2.to_str().unwrap().to_string(),
];
let path_stream = stream::iter(paths).boxed();
storage.delete_stream(path_stream).await.unwrap();

// Verify deleted files no longer exist
assert!(!storage.exists(file1.to_str().unwrap()).await.unwrap());
assert!(!storage.exists(file2.to_str().unwrap()).await.unwrap());

// Verify file3 still exists
assert!(storage.exists(file3.to_str().unwrap()).await.unwrap());
}

#[tokio::test]
async fn test_local_fs_storage_delete_stream_empty() {
use futures::stream;

let storage = LocalFsStorage::new();

// Delete with empty stream should succeed
let path_stream = stream::iter(Vec::<String>::new()).boxed();
storage.delete_stream(path_stream).await.unwrap();
}
}
61 changes: 61 additions & 0 deletions crates/iceberg/src/io/storage/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use std::sync::{Arc, RwLock};

use async_trait::async_trait;
use bytes::Bytes;
use futures::StreamExt;
use futures::stream::BoxStream;
use serde::{Deserialize, Serialize};

use crate::io::{
Expand Down Expand Up @@ -220,6 +222,13 @@ impl Storage for MemoryStorage {
Ok(())
}

async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> {
while let Some(path) = paths.next().await {
self.delete(&path).await?;
}
Ok(())
}

fn new_input(&self, path: &str) -> Result<InputFile> {
Ok(InputFile::new(Arc::new(self.clone()), path.to_string()))
}
Expand Down Expand Up @@ -594,4 +603,56 @@ mod tests {
assert_eq!(storage.read("/path/to/file").await.unwrap(), content);
assert_eq!(storage.read("path/to/file").await.unwrap(), content);
}

#[tokio::test]
async fn test_memory_storage_delete_stream() {
use futures::stream;

let storage = MemoryStorage::new();

// Create multiple files
storage
.write("memory://file1.txt", Bytes::from("1"))
.await
.unwrap();
storage
.write("memory://file2.txt", Bytes::from("2"))
.await
.unwrap();
storage
.write("memory://file3.txt", Bytes::from("3"))
.await
.unwrap();

// Verify files exist
assert!(storage.exists("memory://file1.txt").await.unwrap());
assert!(storage.exists("memory://file2.txt").await.unwrap());
assert!(storage.exists("memory://file3.txt").await.unwrap());

// Delete multiple files using stream
let paths = vec![
"memory://file1.txt".to_string(),
"memory://file2.txt".to_string(),
];
let path_stream = stream::iter(paths).boxed();
storage.delete_stream(path_stream).await.unwrap();

// Verify deleted files no longer exist
assert!(!storage.exists("memory://file1.txt").await.unwrap());
assert!(!storage.exists("memory://file2.txt").await.unwrap());

// Verify file3 still exists
assert!(storage.exists("memory://file3.txt").await.unwrap());
}

#[tokio::test]
async fn test_memory_storage_delete_stream_empty() {
use futures::stream;

let storage = MemoryStorage::new();

// Delete with empty stream should succeed
let path_stream = stream::iter(Vec::<String>::new()).boxed();
storage.delete_stream(path_stream).await.unwrap();
}
}
4 changes: 4 additions & 0 deletions crates/iceberg/src/io/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes;
pub use config::*;
use futures::stream::BoxStream;
pub use local_fs::{LocalFsStorage, LocalFsStorageFactory};
pub use memory::{MemoryStorage, MemoryStorageFactory};

Expand Down Expand Up @@ -93,6 +94,9 @@ pub trait Storage: Debug + Send + Sync {
/// Delete all files with the given prefix
async fn delete_prefix(&self, path: &str) -> Result<()>;

/// Delete multiple files from a stream of paths.
async fn delete_stream(&self, paths: BoxStream<'static, String>) -> Result<()>;

/// Create a new input file for reading
fn new_input(&self, path: &str) -> Result<InputFile>;

Expand Down
1 change: 1 addition & 0 deletions crates/storage/opendal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ reqwest = { workspace = true }
serde = { workspace = true }
typetag = { workspace = true }
url = { workspace = true }
futures = { workspace = true }

[dev-dependencies]
async-trait = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions crates/storage/opendal/src/azdls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl FromStr for AzureStorageScheme {
}

/// Validates whether the given path matches what's configured for the backend.
fn match_path_with_config(
pub(crate) fn match_path_with_config(
path: &AzureStoragePath,
config: &AzdlsConfig,
configured_scheme: &AzureStorageScheme,
Expand Down Expand Up @@ -220,7 +220,7 @@ fn azdls_config_build(config: &AzdlsConfig, path: &AzureStoragePath) -> Result<o

/// Represents a fully qualified path to blob/ file in Azure Storage.
#[derive(Debug, PartialEq)]
struct AzureStoragePath {
pub(crate) struct AzureStoragePath {
/// The scheme of the URL, e.g., `abfss`, `abfs`, `wasbs`, or `wasb`.
scheme: AzureStorageScheme,

Expand All @@ -236,7 +236,7 @@ struct AzureStoragePath {
/// Path to the file.
///
/// It is relative to the `root` of the `AzdlsConfig`.
path: String,
pub(crate) path: String,
}

impl AzureStoragePath {
Expand Down
Loading
Loading