diff --git a/Cargo.lock b/Cargo.lock index 43b46822cf..d6159ee39a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3575,6 +3575,7 @@ dependencies = [ "async-trait", "bytes", "cfg-if", + "futures", "iceberg", "iceberg_test_utils", "opendal", diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 341b19d090..594b070e03 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -19,6 +19,7 @@ use std::ops::Range; use std::sync::{Arc, OnceLock}; use bytes::Bytes; +use futures::{Stream, StreamExt}; use super::storage::{ LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory, @@ -140,6 +141,18 @@ impl FileIO { self.get_storage()?.delete_prefix(path.as_ref()).await } + /// Delete multiple files from a stream of paths. + /// + /// # Arguments + /// + /// * paths: A stream of absolute paths starting with the scheme string used to construct [`FileIO`]. + pub async fn delete_stream( + &self, + paths: impl Stream + Send + 'static, + ) -> Result<()> { + self.get_storage()?.delete_stream(paths.boxed()).await + } + /// Check file exists. /// /// # Arguments diff --git a/crates/iceberg/src/io/storage/local_fs.rs b/crates/iceberg/src/io/storage/local_fs.rs index d6dd5b433b..e96e951baa 100644 --- a/crates/iceberg/src/io/storage/local_fs.rs +++ b/crates/iceberg/src/io/storage/local_fs.rs @@ -29,6 +29,8 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; +use futures::StreamExt; +use futures::stream::BoxStream; use serde::{Deserialize, Serialize}; use crate::io::{ @@ -200,6 +202,13 @@ impl Storage for LocalFsStorage { Ok(()) } + async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> { + while let Some(path) = paths.next().await { + self.delete(&path).await?; + } + Ok(()) + } + fn new_input(&self, path: &str) -> Result { Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) } @@ -534,4 +543,61 @@ mod tests { assert!(path.exists()); } + + #[tokio::test] + async fn test_local_fs_storage_delete_stream() { + use futures::stream; + + let tmp_dir = TempDir::new().unwrap(); + let storage = LocalFsStorage::new(); + + // Create multiple files + let file1 = tmp_dir.path().join("file1.txt"); + let file2 = tmp_dir.path().join("file2.txt"); + let file3 = tmp_dir.path().join("file3.txt"); + + storage + .write(file1.to_str().unwrap(), Bytes::from("1")) + .await + .unwrap(); + storage + .write(file2.to_str().unwrap(), Bytes::from("2")) + .await + .unwrap(); + storage + .write(file3.to_str().unwrap(), Bytes::from("3")) + .await + .unwrap(); + + // Verify files exist + assert!(storage.exists(file1.to_str().unwrap()).await.unwrap()); + assert!(storage.exists(file2.to_str().unwrap()).await.unwrap()); + assert!(storage.exists(file3.to_str().unwrap()).await.unwrap()); + + // Delete multiple files using stream + let paths = vec![ + file1.to_str().unwrap().to_string(), + file2.to_str().unwrap().to_string(), + ]; + let path_stream = stream::iter(paths).boxed(); + storage.delete_stream(path_stream).await.unwrap(); + + // Verify deleted files no longer exist + assert!(!storage.exists(file1.to_str().unwrap()).await.unwrap()); + assert!(!storage.exists(file2.to_str().unwrap()).await.unwrap()); + + // Verify file3 still exists + assert!(storage.exists(file3.to_str().unwrap()).await.unwrap()); + } + + #[tokio::test] + async fn test_local_fs_storage_delete_stream_empty() { + use futures::stream; + + let storage = LocalFsStorage::new(); + + // Delete with empty stream should succeed + let path_stream = stream::iter(Vec::::new()).boxed(); + storage.delete_stream(path_stream).await.unwrap(); + } } diff --git a/crates/iceberg/src/io/storage/memory.rs b/crates/iceberg/src/io/storage/memory.rs index cb01ee4709..f33dbd07b1 100644 --- a/crates/iceberg/src/io/storage/memory.rs +++ b/crates/iceberg/src/io/storage/memory.rs @@ -28,6 +28,8 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use bytes::Bytes; +use futures::StreamExt; +use futures::stream::BoxStream; use serde::{Deserialize, Serialize}; use crate::io::{ @@ -220,6 +222,13 @@ impl Storage for MemoryStorage { Ok(()) } + async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> { + while let Some(path) = paths.next().await { + self.delete(&path).await?; + } + Ok(()) + } + fn new_input(&self, path: &str) -> Result { Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) } @@ -594,4 +603,56 @@ mod tests { assert_eq!(storage.read("/path/to/file").await.unwrap(), content); assert_eq!(storage.read("path/to/file").await.unwrap(), content); } + + #[tokio::test] + async fn test_memory_storage_delete_stream() { + use futures::stream; + + let storage = MemoryStorage::new(); + + // Create multiple files + storage + .write("memory://file1.txt", Bytes::from("1")) + .await + .unwrap(); + storage + .write("memory://file2.txt", Bytes::from("2")) + .await + .unwrap(); + storage + .write("memory://file3.txt", Bytes::from("3")) + .await + .unwrap(); + + // Verify files exist + assert!(storage.exists("memory://file1.txt").await.unwrap()); + assert!(storage.exists("memory://file2.txt").await.unwrap()); + assert!(storage.exists("memory://file3.txt").await.unwrap()); + + // Delete multiple files using stream + let paths = vec![ + "memory://file1.txt".to_string(), + "memory://file2.txt".to_string(), + ]; + let path_stream = stream::iter(paths).boxed(); + storage.delete_stream(path_stream).await.unwrap(); + + // Verify deleted files no longer exist + assert!(!storage.exists("memory://file1.txt").await.unwrap()); + assert!(!storage.exists("memory://file2.txt").await.unwrap()); + + // Verify file3 still exists + assert!(storage.exists("memory://file3.txt").await.unwrap()); + } + + #[tokio::test] + async fn test_memory_storage_delete_stream_empty() { + use futures::stream; + + let storage = MemoryStorage::new(); + + // Delete with empty stream should succeed + let path_stream = stream::iter(Vec::::new()).boxed(); + storage.delete_stream(path_stream).await.unwrap(); + } } diff --git a/crates/iceberg/src/io/storage/mod.rs b/crates/iceberg/src/io/storage/mod.rs index 3c7c555a55..5276c7771f 100644 --- a/crates/iceberg/src/io/storage/mod.rs +++ b/crates/iceberg/src/io/storage/mod.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; pub use config::*; +use futures::stream::BoxStream; pub use local_fs::{LocalFsStorage, LocalFsStorageFactory}; pub use memory::{MemoryStorage, MemoryStorageFactory}; @@ -93,6 +94,9 @@ pub trait Storage: Debug + Send + Sync { /// Delete all files with the given prefix async fn delete_prefix(&self, path: &str) -> Result<()>; + /// Delete multiple files from a stream of paths. + async fn delete_stream(&self, paths: BoxStream<'static, String>) -> Result<()>; + /// Create a new input file for reading fn new_input(&self, path: &str) -> Result; diff --git a/crates/storage/opendal/Cargo.toml b/crates/storage/opendal/Cargo.toml index e0a3cf8ed6..84f7e1147a 100644 --- a/crates/storage/opendal/Cargo.toml +++ b/crates/storage/opendal/Cargo.toml @@ -49,6 +49,7 @@ reqwest = { workspace = true } serde = { workspace = true } typetag = { workspace = true } url = { workspace = true } +futures = { workspace = true } [dev-dependencies] async-trait = { workspace = true } diff --git a/crates/storage/opendal/src/azdls.rs b/crates/storage/opendal/src/azdls.rs index 70caae7c4e..6251f8cdaa 100644 --- a/crates/storage/opendal/src/azdls.rs +++ b/crates/storage/opendal/src/azdls.rs @@ -160,7 +160,7 @@ impl FromStr for AzureStorageScheme { } /// Validates whether the given path matches what's configured for the backend. -fn match_path_with_config( +pub(crate) fn match_path_with_config( path: &AzureStoragePath, config: &AzdlsConfig, configured_scheme: &AzureStorageScheme, @@ -220,7 +220,7 @@ fn azdls_config_build(config: &AzdlsConfig, path: &AzureStoragePath) -> Result(&self, path: &'a str) -> Result<&'a str> { + match self { + #[cfg(feature = "opendal-memory")] + OpenDalStorage::Memory(_) => Ok(path.strip_prefix("memory:/").unwrap_or(&path[1..])), + #[cfg(feature = "opendal-fs")] + OpenDalStorage::LocalFs => Ok(path.strip_prefix("file:/").unwrap_or(&path[1..])), + #[cfg(feature = "opendal-s3")] + OpenDalStorage::S3 { + configured_scheme, .. + } => { + let url = url::Url::parse(path)?; + let bucket = url.host_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {path}, missing bucket"), + ) + })?; + let prefix = format!("{}://{}/", configured_scheme, bucket); + if path.starts_with(&prefix) { + Ok(&path[prefix.len()..]) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {path}, should start with {prefix}"), + )) + } + } + #[cfg(feature = "opendal-gcs")] + OpenDalStorage::Gcs { .. } => { + let url = url::Url::parse(path)?; + let bucket = url.host_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid gcs url: {path}, missing bucket"), + ) + })?; + let prefix = format!("gs://{}/", bucket); + if path.starts_with(&prefix) { + Ok(&path[prefix.len()..]) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid gcs url: {path}, should start with {prefix}"), + )) + } + } + #[cfg(feature = "opendal-oss")] + OpenDalStorage::Oss { .. } => { + let url = url::Url::parse(path)?; + let bucket = url.host_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Invalid oss url: {path}, missing bucket"), + ) + })?; + let prefix = format!("oss://{}/", bucket); + if path.starts_with(&prefix) { + Ok(&path[prefix.len()..]) + } else { + Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid oss url: {path}, should start with {prefix}"), + )) + } + } + #[cfg(feature = "opendal-azdls")] + OpenDalStorage::Azdls { + configured_scheme, + config, + } => { + let azure_path = path.parse::()?; + match_path_with_config(&azure_path, config, configured_scheme)?; + let relative_path_len = azure_path.path.len(); + Ok(&path[path.len() - relative_path_len..]) + } + #[cfg(all( + not(feature = "opendal-s3"), + not(feature = "opendal-fs"), + not(feature = "opendal-gcs"), + not(feature = "opendal-oss"), + not(feature = "opendal-azdls"), + ))] + _ => Err(Error::new( + ErrorKind::FeatureUnsupported, + "No storage service has been enabled", + )), + } + } } #[typetag::serde(name = "OpenDalStorage")] @@ -400,6 +499,40 @@ impl Storage for OpenDalStorage { Ok(op.remove_all(&path).await.map_err(from_opendal_error)?) } + async fn delete_stream(&self, mut paths: BoxStream<'static, String>) -> Result<()> { + let mut deleters: HashMap = HashMap::new(); + + while let Some(path) = paths.next().await { + let bucket = url::Url::parse(&path) + .ok() + .and_then(|u| u.host_str().map(|s| s.to_string())) + .unwrap_or_default(); + + let (relative_path, deleter) = match deleters.entry(bucket) { + Entry::Occupied(entry) => { + (self.relativize_path(&path)?.to_string(), entry.into_mut()) + } + Entry::Vacant(entry) => { + let (op, rel) = self.create_operator(&path)?; + let rel = rel.to_string(); + let deleter = op.deleter().await.map_err(from_opendal_error)?; + (rel, entry.insert(deleter)) + } + }; + + deleter + .delete(relative_path) + .await + .map_err(from_opendal_error)?; + } + + for (_, mut deleter) in deleters { + deleter.close().await.map_err(from_opendal_error)?; + } + + Ok(()) + } + #[allow(unreachable_code, unused_variables)] fn new_input(&self, path: &str) -> Result { Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) @@ -457,4 +590,182 @@ mod tests { let op = default_memory_operator(); assert_eq!(op.info().scheme().to_string(), "memory"); } + + #[cfg(feature = "opendal-memory")] + #[test] + fn test_relativize_path_memory() { + let storage = OpenDalStorage::Memory(default_memory_operator()); + + assert_eq!( + storage.relativize_path("memory:/path/to/file").unwrap(), + "path/to/file" + ); + // Without the scheme prefix, falls back to stripping the leading slash + assert_eq!( + storage.relativize_path("/path/to/file").unwrap(), + "path/to/file" + ); + } + + #[cfg(feature = "opendal-fs")] + #[test] + fn test_relativize_path_fs() { + let storage = OpenDalStorage::LocalFs; + + assert_eq!( + storage + .relativize_path("file:/tmp/data/file.parquet") + .unwrap(), + "tmp/data/file.parquet" + ); + assert_eq!( + storage.relativize_path("/tmp/data/file.parquet").unwrap(), + "tmp/data/file.parquet" + ); + } + + #[cfg(feature = "opendal-s3")] + #[test] + fn test_relativize_path_s3() { + let storage = OpenDalStorage::S3 { + configured_scheme: "s3".to_string(), + config: Arc::new(S3Config::default()), + customized_credential_load: None, + }; + + assert_eq!( + storage + .relativize_path("s3://my-bucket/path/to/file.parquet") + .unwrap(), + "path/to/file.parquet" + ); + + // s3a scheme + let storage_s3a = OpenDalStorage::S3 { + configured_scheme: "s3a".to_string(), + config: Arc::new(S3Config::default()), + customized_credential_load: None, + }; + assert_eq!( + storage_s3a + .relativize_path("s3a://my-bucket/path/to/file.parquet") + .unwrap(), + "path/to/file.parquet" + ); + } + + #[cfg(feature = "opendal-s3")] + #[test] + fn test_relativize_path_s3_scheme_mismatch() { + let storage = OpenDalStorage::S3 { + configured_scheme: "s3".to_string(), + config: Arc::new(S3Config::default()), + customized_credential_load: None, + }; + + // Scheme mismatch should error + assert!( + storage + .relativize_path("s3a://my-bucket/path/to/file.parquet") + .is_err() + ); + } + + #[cfg(feature = "opendal-gcs")] + #[test] + fn test_relativize_path_gcs() { + let storage = OpenDalStorage::Gcs { + config: Arc::new(GcsConfig::default()), + }; + + assert_eq!( + storage + .relativize_path("gs://my-bucket/path/to/file.parquet") + .unwrap(), + "path/to/file.parquet" + ); + } + + #[cfg(feature = "opendal-gcs")] + #[test] + fn test_relativize_path_gcs_invalid_scheme() { + let storage = OpenDalStorage::Gcs { + config: Arc::new(GcsConfig::default()), + }; + + assert!( + storage + .relativize_path("s3://my-bucket/path/to/file.parquet") + .is_err() + ); + } + + #[cfg(feature = "opendal-oss")] + #[test] + fn test_relativize_path_oss() { + let storage = OpenDalStorage::Oss { + config: Arc::new(OssConfig::default()), + }; + + assert_eq!( + storage + .relativize_path("oss://my-bucket/path/to/file.parquet") + .unwrap(), + "path/to/file.parquet" + ); + } + + #[cfg(feature = "opendal-oss")] + #[test] + fn test_relativize_path_oss_invalid_scheme() { + let storage = OpenDalStorage::Oss { + config: Arc::new(OssConfig::default()), + }; + + assert!( + storage + .relativize_path("s3://my-bucket/path/to/file.parquet") + .is_err() + ); + } + + #[cfg(feature = "opendal-azdls")] + #[test] + fn test_relativize_path_azdls() { + let storage = OpenDalStorage::Azdls { + configured_scheme: AzureStorageScheme::Abfss, + config: Arc::new(AzdlsConfig { + account_name: Some("myaccount".to_string()), + endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()), + ..Default::default() + }), + }; + + assert_eq!( + storage + .relativize_path("abfss://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet") + .unwrap(), + "/path/to/file.parquet" + ); + } + + #[cfg(feature = "opendal-azdls")] + #[test] + fn test_relativize_path_azdls_scheme_mismatch() { + let storage = OpenDalStorage::Azdls { + configured_scheme: AzureStorageScheme::Abfss, + config: Arc::new(AzdlsConfig { + account_name: Some("myaccount".to_string()), + endpoint: Some("https://myaccount.dfs.core.windows.net".to_string()), + ..Default::default() + }), + }; + + // wasbs scheme doesn't match configured abfss + assert!( + storage + .relativize_path("wasbs://myfs@myaccount.dfs.core.windows.net/path/to/file.parquet") + .is_err() + ); + } } diff --git a/crates/storage/opendal/tests/file_io_s3_test.rs b/crates/storage/opendal/tests/file_io_s3_test.rs index 5801af0606..207a4454d7 100644 --- a/crates/storage/opendal/tests/file_io_s3_test.rs +++ b/crates/storage/opendal/tests/file_io_s3_test.rs @@ -24,6 +24,7 @@ mod tests { use std::sync::Arc; use async_trait::async_trait; + use futures::StreamExt; use iceberg::io::{ FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, }; @@ -203,4 +204,46 @@ mod tests { } } } + + #[tokio::test] + async fn test_file_io_s3_delete_stream() { + let file_io = get_file_io().await; + + // Write multiple files + let paths: Vec = (0..5) + .map(|i| { + format!( + "s3://bucket1/{}/file-{i}", + normalize_test_name_with_parts!("test_file_io_s3_delete_stream") + ) + }) + .collect(); + for path in &paths { + let _ = file_io.delete(path).await; + file_io + .new_output(path) + .unwrap() + .write("delete-me".into()) + .await + .unwrap(); + assert!(file_io.exists(path).await.unwrap()); + } + + // Delete via delete_stream + let stream = futures::stream::iter(paths.clone()).boxed(); + file_io.delete_stream(stream).await.unwrap(); + + // Verify all files are gone + for path in &paths { + assert!(!file_io.exists(path).await.unwrap()); + } + } + + #[tokio::test] + async fn test_file_io_s3_delete_stream_empty() { + let file_io = get_file_io().await; + let stream = futures::stream::empty().boxed(); + // Should succeed with no-op + file_io.delete_stream(stream).await.unwrap(); + } }