From 7c857cf70f2d83bb7a5ca68597063b77c7f70d9a Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 4 Mar 2026 14:31:34 -0800 Subject: [PATCH 1/9] move opendal to iceberg-storage-opendal --- Cargo.lock | 26 ++++- Cargo.toml | 2 + bindings/python/Cargo.toml | 1 + .../python/src/datafusion_table_provider.rs | 3 +- crates/catalog/glue/Cargo.toml | 1 + crates/catalog/glue/src/catalog.rs | 5 +- .../catalog/glue/tests/glue_catalog_test.rs | 6 +- crates/catalog/hms/Cargo.toml | 1 + crates/catalog/hms/tests/hms_catalog_test.rs | 6 +- crates/catalog/s3tables/Cargo.toml | 1 + crates/catalog/s3tables/src/catalog.rs | 3 +- crates/examples/Cargo.toml | 3 +- crates/examples/src/oss_backend.rs | 2 +- crates/iceberg/Cargo.toml | 12 +-- crates/iceberg/src/catalog/memory/catalog.rs | 12 ++- crates/iceberg/src/catalog/mod.rs | 3 +- crates/iceberg/src/error.rs | 6 -- crates/iceberg/src/io/storage/mod.rs | 4 - crates/integration_tests/Cargo.toml | 1 + crates/integration_tests/tests/common/mod.rs | 2 +- .../tests/conflict_commit_test.rs | 2 +- .../tests/read_evolved_schema.rs | 2 +- .../tests/read_positional_deletes.rs | 2 +- crates/storage/opendal/Cargo.toml | 58 ++++++++++ .../opendal => storage/opendal/src}/azdls.rs | 18 +++- .../opendal => storage/opendal/src}/fs.rs | 7 +- .../opendal => storage/opendal/src}/gcs.rs | 10 +- crates/storage/opendal/src/lib.rs | 44 ++++++++ .../opendal => storage/opendal/src}/memory.rs | 7 +- .../opendal => storage/opendal/src}/oss.rs | 8 +- .../opendal => storage/opendal/src}/s3.rs | 10 +- .../mod.rs => storage/opendal/src/storage.rs} | 102 ++++++++++-------- crates/storage/opendal/src/utils.rs | 26 +++++ .../opendal}/tests/file_io_s3_test.rs | 6 +- 34 files changed, 293 insertions(+), 109 deletions(-) create mode 100644 crates/storage/opendal/Cargo.toml rename crates/{iceberg/src/io/storage/opendal => storage/opendal/src}/azdls.rs (97%) rename crates/{iceberg/src/io/storage/opendal => storage/opendal/src}/fs.rs (87%) rename crates/{iceberg/src/io/storage/opendal => storage/opendal/src}/gcs.rs (92%) create mode 100644 crates/storage/opendal/src/lib.rs rename crates/{iceberg/src/io/storage/opendal => storage/opendal/src}/memory.rs (84%) rename crates/{iceberg/src/io/storage/opendal => storage/opendal/src}/oss.rs (89%) rename crates/{iceberg/src/io/storage/opendal => storage/opendal/src}/s3.rs (96%) rename crates/{iceberg/src/io/storage/opendal/mod.rs => storage/opendal/src/storage.rs} (85%) create mode 100644 crates/storage/opendal/src/utils.rs rename crates/{iceberg => storage/opendal}/tests/file_io_s3_test.rs (97%) diff --git a/Cargo.lock b/Cargo.lock index 8f5df819a2..92bdb6d27a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3299,13 +3299,11 @@ dependencies = [ "moka", "murmur3", "once_cell", - "opendal", "ordered-float 4.6.0", "parquet", "pretty_assertions", "rand 0.8.5", "regex", - "reqsign", "reqwest", "roaring", "serde", @@ -3341,6 +3339,7 @@ dependencies = [ "aws-config", "aws-sdk-glue", "iceberg", + "iceberg-storage-opendal", "iceberg_test_utils", "serde_json", "tokio", @@ -3357,6 +3356,7 @@ dependencies = [ "faststr", "hive_metastore", "iceberg", + "iceberg-storage-opendal", "iceberg_test_utils", "linkedbytes", "metainfo", @@ -3415,6 +3415,7 @@ dependencies = [ "aws-config", "aws-sdk-s3tables", "iceberg", + "iceberg-storage-opendal", "iceberg_test_utils", "itertools 0.13.0", "tokio", @@ -3458,6 +3459,7 @@ dependencies = [ "futures", "iceberg", "iceberg-catalog-rest", + "iceberg-storage-opendal", "tokio", ] @@ -3470,6 +3472,7 @@ dependencies = [ "futures", "iceberg", "iceberg-catalog-rest", + "iceberg-storage-opendal", "iceberg_test_utils", "ordered-float 2.10.1", "parquet", @@ -3521,6 +3524,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "iceberg-storage-opendal" +version = "0.8.0" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "iceberg", + "iceberg_test_utils", + "opendal", + "reqsign", + "reqwest", + "serde", + "serde_derive", + "tokio", + "typetag", + "url", +] + [[package]] name = "iceberg_test_utils" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 23b498214a..78a9335ad1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "crates/integration_tests", "crates/integrations/*", "crates/sqllogictest", + "crates/storage/*", "crates/test_utils", ] resolver = "2" @@ -85,6 +86,7 @@ iceberg-catalog-rest = { version = "0.8.0", path = "./crates/catalog/rest" } iceberg-catalog-s3tables = { version = "0.8.0", path = "./crates/catalog/s3tables" } iceberg-catalog-sql = { version = "0.8.0", path = "./crates/catalog/sql" } iceberg-datafusion = { version = "0.8.0", path = "./crates/integrations/datafusion" } +iceberg-storage-opendal = { version = "0.8.0", path = "./crates/storage/opendal" } indicatif = "0.18" itertools = "0.13" libtest-mimic = "0.8.1" diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 9279983f96..7d28cb0bb2 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -33,6 +33,7 @@ crate-type = ["cdylib"] [dependencies] arrow = { version = "57.1", features = ["pyarrow", "chrono-tz"] } iceberg = { path = "../../crates/iceberg" } +iceberg-storage-opendal = { path = "../../crates/storage/opendal", features = ["storage-s3", "storage-fs", "storage-memory"] } pyo3 = { version = "0.26", features = ["extension-module", "abi3-py310"] } iceberg-datafusion = { path = "../../crates/integrations/datafusion" } datafusion-ffi = { version = "52.1" } diff --git a/bindings/python/src/datafusion_table_provider.rs b/bindings/python/src/datafusion_table_provider.rs index d4a0234ff7..33923128fe 100644 --- a/bindings/python/src/datafusion_table_provider.rs +++ b/bindings/python/src/datafusion_table_provider.rs @@ -22,7 +22,8 @@ use std::sync::Arc; use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec; use datafusion_ffi::table_provider::FFI_TableProvider; use iceberg::TableIdent; -use iceberg::io::{FileIOBuilder, OpenDalStorageFactory, StorageFactory}; +use iceberg::io::{FileIOBuilder, StorageFactory}; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::table::StaticTable; use iceberg_datafusion::table::IcebergStaticTableProvider; use pyo3::exceptions::{PyRuntimeError, PyValueError}; diff --git a/crates/catalog/glue/Cargo.toml b/crates/catalog/glue/Cargo.toml index f42fedeae1..d4efbf54e3 100644 --- a/crates/catalog/glue/Cargo.toml +++ b/crates/catalog/glue/Cargo.toml @@ -34,6 +34,7 @@ async-trait = { workspace = true } aws-config = { workspace = true } aws-sdk-glue = { workspace = true } iceberg = { workspace = true } +iceberg-storage-opendal = { workspace = true, features = ["storage-s3"] } serde_json = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 7894900068..31d9293ef4 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -25,9 +25,10 @@ use aws_sdk_glue::operation::create_table::CreateTableError; use aws_sdk_glue::operation::update_table::UpdateTableError; use aws_sdk_glue::types::TableInput; use iceberg::io::{ - FileIO, FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, - S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, StorageFactory, + FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, + S3_SESSION_TOKEN, StorageFactory, }; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs b/crates/catalog/glue/tests/glue_catalog_test.rs index a209ae09c8..b29da8fe7d 100644 --- a/crates/catalog/glue/tests/glue_catalog_test.rs +++ b/crates/catalog/glue/tests/glue_catalog_test.rs @@ -23,10 +23,8 @@ use std::collections::HashMap; use std::sync::Arc; -use iceberg::io::{ - FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, - S3_SECRET_ACCESS_KEY, -}; +use iceberg::io::{FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::{ diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index a6517fb7b0..b5a549ce4d 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -54,6 +54,7 @@ motore-macros = { workspace = true } volo = { workspace = true } [dev-dependencies] +iceberg-storage-opendal = { workspace = true, features = ["storage-s3"] } iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } [package.metadata.cargo-machete] diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index c5d77ac37a..16b99e7e52 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -23,10 +23,8 @@ use std::collections::HashMap; use std::sync::Arc; -use iceberg::io::{ - FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, - S3_SECRET_ACCESS_KEY, -}; +use iceberg::io::{FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; use iceberg_catalog_hms::{ diff --git a/crates/catalog/s3tables/Cargo.toml b/crates/catalog/s3tables/Cargo.toml index fde08b9a49..dbd2ef008a 100644 --- a/crates/catalog/s3tables/Cargo.toml +++ b/crates/catalog/s3tables/Cargo.toml @@ -35,6 +35,7 @@ async-trait = { workspace = true } aws-config = { workspace = true } aws-sdk-s3tables = { workspace = true } iceberg = { workspace = true } +iceberg-storage-opendal = { workspace = true, features = ["storage-s3"] } [dev-dependencies] diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 58100ccbce..54cd1efc0b 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -26,7 +26,8 @@ use aws_sdk_s3tables::operation::get_table::GetTableOutput; use aws_sdk_s3tables::operation::list_tables::ListTablesOutput; use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError; use aws_sdk_s3tables::types::OpenTableFormat; -use iceberg::io::{FileIO, FileIOBuilder, OpenDalStorageFactory, StorageFactory}; +use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index c7874d9a17..97ec82f318 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -28,6 +28,7 @@ version = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } +iceberg-storage-opendal = { workspace = true, optional = true } tokio = { workspace = true, features = ["full"] } [[example]] @@ -45,4 +46,4 @@ required-features = ["storage-oss"] [features] default = [] -storage-oss = ["iceberg/storage-oss"] +storage-oss = ["iceberg-storage-opendal/storage-oss"] diff --git a/crates/examples/src/oss_backend.rs b/crates/examples/src/oss_backend.rs index 9835b8dc44..0cf833f5ac 100644 --- a/crates/examples/src/oss_backend.rs +++ b/crates/examples/src/oss_backend.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; use std::sync::Arc; use futures::stream::StreamExt; -use iceberg::io::OpenDalStorageFactory; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableIdent}; use iceberg_catalog_rest::{REST_CATALOG_PROP_URI, RestCatalogBuilder}; diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index d6d931c86d..41ee771617 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -29,15 +29,7 @@ license = { workspace = true } repository = { workspace = true } [features] -default = ["storage-memory", "storage-fs", "storage-s3"] -storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"] - -storage-azdls = ["opendal/services-azdls"] -storage-fs = ["opendal/services-fs"] -storage-gcs = ["opendal/services-gcs"] -storage-memory = ["opendal/services-memory"] -storage-oss = ["opendal/services-oss"] -storage-s3 = ["opendal/services-s3", "reqsign"] +default = [] [dependencies] @@ -68,11 +60,9 @@ itertools = { workspace = true } moka = { version = "0.12.10", features = ["future"] } murmur3 = { workspace = true } once_cell = { workspace = true } -opendal = { workspace = true } ordered-float = { workspace = true } parquet = { workspace = true, features = ["async"] } rand = { workspace = true } -reqsign = { version = "0.16.3", optional = true, default-features = false } reqwest = { workspace = true } roaring = { workspace = true } fastnum = { workspace = true } diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index e008de8050..32ed8ade67 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -25,7 +25,7 @@ use futures::lock::{Mutex, MutexGuard}; use itertools::Itertools; use super::namespace_state::NamespaceState; -use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory}; +use crate::io::{FileIO, FileIOBuilder, StorageFactory}; use crate::spec::{TableMetadata, TableMetadataBuilder}; use crate::table::Table; use crate::{ @@ -128,12 +128,16 @@ impl MemoryCatalog { config: MemoryCatalogConfig, storage_factory: Option>, ) -> Result { - // Use provided factory or default to MemoryStorageFactory - let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory)); + let file_io = match storage_factory { + Some(factory) => FileIOBuilder::new(factory) + .with_props(config.props) + .build(), + None => FileIO::new_with_memory(), + }; Ok(Self { root_namespace_state: Mutex::new(NamespaceState::default()), - file_io: FileIOBuilder::new(factory).with_props(config.props).build(), + file_io, warehouse_location: config.warehouse, }) } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index e5014c3e70..8db9674ad9 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -129,7 +129,8 @@ pub trait CatalogBuilder: Default + Debug + Send + Sync { /// /// ```rust,ignore /// use iceberg::CatalogBuilder; - /// use iceberg::io::{OpenDalStorageFactory, StorageFactory}; + /// use iceberg::io::StorageFactory; + /// use iceberg_storage_opendal::OpenDalStorageFactory; /// use std::sync::Arc; /// /// let catalog = MyCatalogBuilder::default() diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 8810ef5009..a0399a8082 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -384,12 +384,6 @@ define_from_err!( "Failure in conversion with avro" ); -define_from_err!( - opendal::Error, - ErrorKind::Unexpected, - "Failure in doing io operation" -); - define_from_err!( url::ParseError, ErrorKind::DataInvalid, diff --git a/crates/iceberg/src/io/storage/mod.rs b/crates/iceberg/src/io/storage/mod.rs index 31ceaf2436..2b4f9311b1 100644 --- a/crates/iceberg/src/io/storage/mod.rs +++ b/crates/iceberg/src/io/storage/mod.rs @@ -20,7 +20,6 @@ mod config; mod local_fs; mod memory; -mod opendal; use std::fmt::Debug; use std::sync::Arc; @@ -30,9 +29,6 @@ use bytes::Bytes; pub use config::*; pub use local_fs::{LocalFsStorage, LocalFsStorageFactory}; pub use memory::{MemoryStorage, MemoryStorageFactory}; -#[cfg(feature = "storage-s3")] -pub use opendal::CustomAwsCredentialLoader; -pub use opendal::{OpenDalStorage, OpenDalStorageFactory}; use super::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; use crate::Result; diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index 2ed211769f..0fb609f754 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -30,6 +30,7 @@ arrow-schema = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } +iceberg-storage-opendal = { workspace = true, features = ["storage-s3"] } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } parquet = { workspace = true } tokio = { workspace = true } diff --git a/crates/integration_tests/tests/common/mod.rs b/crates/integration_tests/tests/common/mod.rs index 957c8be5cc..8122baee01 100644 --- a/crates/integration_tests/tests/common/mod.rs +++ b/crates/integration_tests/tests/common/mod.rs @@ -18,7 +18,7 @@ use std::collections::HashMap; use std::sync::Arc; -use iceberg::io::OpenDalStorageFactory; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent}; use iceberg_catalog_rest::RestCatalogBuilder; diff --git a/crates/integration_tests/tests/conflict_commit_test.rs b/crates/integration_tests/tests/conflict_commit_test.rs index dc3030519f..268e94aaf2 100644 --- a/crates/integration_tests/tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/conflict_commit_test.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use common::{random_ns, test_schema}; use futures::TryStreamExt; -use iceberg::io::OpenDalStorageFactory; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; diff --git a/crates/integration_tests/tests/read_evolved_schema.rs b/crates/integration_tests/tests/read_evolved_schema.rs index 78d833b3a1..3efc071ae0 100644 --- a/crates/integration_tests/tests/read_evolved_schema.rs +++ b/crates/integration_tests/tests/read_evolved_schema.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use arrow_array::{Decimal128Array, Float64Array, Int64Array, StringArray}; use futures::TryStreamExt; use iceberg::expr::Reference; -use iceberg::io::OpenDalStorageFactory; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::spec::Datum; use iceberg::{Catalog, CatalogBuilder, TableIdent}; use iceberg_catalog_rest::RestCatalogBuilder; diff --git a/crates/integration_tests/tests/read_positional_deletes.rs b/crates/integration_tests/tests/read_positional_deletes.rs index 71ff128d1b..54da6b5bd4 100644 --- a/crates/integration_tests/tests/read_positional_deletes.rs +++ b/crates/integration_tests/tests/read_positional_deletes.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use futures::TryStreamExt; -use iceberg::io::OpenDalStorageFactory; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::{Catalog, CatalogBuilder, TableIdent}; use iceberg_catalog_rest::RestCatalogBuilder; use iceberg_integration_tests::get_test_fixture; diff --git a/crates/storage/opendal/Cargo.toml b/crates/storage/opendal/Cargo.toml new file mode 100644 index 0000000000..4ff80f1863 --- /dev/null +++ b/crates/storage/opendal/Cargo.toml @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-storage-opendal" +edition = { workspace = true } +version = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg OpenDAL storage implementation" +keywords = ["iceberg", "opendal", "storage"] + +[features] +default = ["storage-memory", "storage-fs", "storage-s3"] +storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs", "storage-oss", "storage-azdls"] + +storage-azdls = ["opendal/services-azdls"] +storage-fs = ["opendal/services-fs"] +storage-gcs = ["opendal/services-gcs"] +storage-memory = ["opendal/services-memory"] +storage-oss = ["opendal/services-oss"] +storage-s3 = ["opendal/services-s3", "reqsign"] + +[dependencies] +anyhow = { workspace = true } +iceberg = { workspace = true } +opendal = { workspace = true } +async-trait = { workspace = true } +bytes = { workspace = true } +reqsign = { version = "0.16.3", optional = true, default-features = false } +reqwest = { workspace = true } +serde = { workspace = true } +serde_derive = { workspace = true } +typetag = { workspace = true } +url = { workspace = true } + +[dev-dependencies] +async-trait = { workspace = true } +iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } +reqsign = { version = "0.16.3", default-features = false } +reqwest = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/crates/iceberg/src/io/storage/opendal/azdls.rs b/crates/storage/opendal/src/azdls.rs similarity index 97% rename from crates/iceberg/src/io/storage/opendal/azdls.rs rename to crates/storage/opendal/src/azdls.rs index 759ff301ea..c27a9aef44 100644 --- a/crates/iceberg/src/io/storage/opendal/azdls.rs +++ b/crates/storage/opendal/src/azdls.rs @@ -24,11 +24,23 @@ use opendal::services::AzdlsConfig; use serde::{Deserialize, Serialize}; use url::Url; -use crate::io::{ +use iceberg::io::{ ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, ADLS_AUTHORITY_HOST, ADLS_CLIENT_ID, ADLS_CLIENT_SECRET, ADLS_CONNECTION_STRING, ADLS_SAS_TOKEN, ADLS_TENANT_ID, }; -use crate::{Error, ErrorKind, Result, ensure_data_valid}; +use iceberg::{Error, ErrorKind, Result}; + +use crate::utils::from_opendal_error; + +/// Local version of ensure_data_valid macro since the iceberg crate's macro +/// references private modules via $crate paths. +macro_rules! ensure_data_valid { + ($cond:expr, $fmt:literal, $($arg:tt)*) => { + if !$cond { + return Err(Error::new(ErrorKind::DataInvalid, format!($fmt, $($arg)*))); + } + }; +} /// Parses adls.* prefixed configuration properties. pub(crate) fn azdls_config_parse(mut properties: HashMap) -> Result { @@ -201,7 +213,7 @@ fn azdls_config_build(config: &AzdlsConfig, path: &AzureStoragePath) -> Result Result { let mut cfg = FsConfig::default(); cfg.root = Some("/".to_string()); - Ok(Operator::from_config(cfg)?.finish()) + Ok(Operator::from_config(cfg) + .map_err(from_opendal_error)? + .finish()) } diff --git a/crates/iceberg/src/io/storage/opendal/gcs.rs b/crates/storage/opendal/src/gcs.rs similarity index 92% rename from crates/iceberg/src/io/storage/opendal/gcs.rs rename to crates/storage/opendal/src/gcs.rs index 4cb8aa8591..9c2cbe9dba 100644 --- a/crates/iceberg/src/io/storage/opendal/gcs.rs +++ b/crates/storage/opendal/src/gcs.rs @@ -22,11 +22,13 @@ use opendal::Operator; use opendal::services::GcsConfig; use url::Url; -use crate::io::{ +use iceberg::io::{ GCS_ALLOW_ANONYMOUS, GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, GCS_DISABLE_VM_METADATA, - GCS_NO_AUTH, GCS_SERVICE_PATH, GCS_TOKEN, is_truthy, + GCS_NO_AUTH, GCS_SERVICE_PATH, GCS_TOKEN, }; -use crate::{Error, ErrorKind, Result}; +use iceberg::{Error, ErrorKind, Result}; + +use crate::utils::{from_opendal_error, is_truthy}; /// Parse iceberg properties to [`GcsConfig`]. pub(crate) fn gcs_config_parse(mut m: HashMap) -> Result { @@ -81,5 +83,5 @@ pub(crate) fn gcs_config_build(cfg: &GcsConfig, path: &str) -> Result let mut cfg = cfg.clone(); cfg.bucket = bucket.to_string(); - Ok(Operator::from_config(cfg)?.finish()) + Ok(Operator::from_config(cfg).map_err(from_opendal_error)?.finish()) } diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs new file mode 100644 index 0000000000..85f674eed8 --- /dev/null +++ b/crates/storage/opendal/src/lib.rs @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! OpenDAL-based storage implementation for Apache Iceberg. +//! +//! This crate provides [`OpenDalStorage`] and [`OpenDalStorageFactory`], +//! which implement the [`Storage`](iceberg::io::Storage) and +//! [`StorageFactory`](iceberg::io::StorageFactory) traits from the `iceberg` crate +//! using [OpenDAL](https://opendal.apache.org/) as the backend. + +mod storage; +mod utils; + +#[cfg(feature = "storage-azdls")] +mod azdls; +#[cfg(feature = "storage-fs")] +mod fs; +#[cfg(feature = "storage-gcs")] +mod gcs; +#[cfg(feature = "storage-memory")] +mod memory; +#[cfg(feature = "storage-oss")] +mod oss; +#[cfg(feature = "storage-s3")] +mod s3; + +pub use storage::{OpenDalStorage, OpenDalStorageFactory}; + +#[cfg(feature = "storage-s3")] +pub use s3::CustomAwsCredentialLoader; diff --git a/crates/iceberg/src/io/storage/opendal/memory.rs b/crates/storage/opendal/src/memory.rs similarity index 84% rename from crates/iceberg/src/io/storage/opendal/memory.rs rename to crates/storage/opendal/src/memory.rs index b8023717b6..3b1431091c 100644 --- a/crates/iceberg/src/io/storage/opendal/memory.rs +++ b/crates/storage/opendal/src/memory.rs @@ -18,8 +18,11 @@ use opendal::Operator; use opendal::services::MemoryConfig; -use crate::Result; +use crate::utils::from_opendal_error; +use iceberg::Result; pub(crate) fn memory_config_build() -> Result { - Ok(Operator::from_config(MemoryConfig::default())?.finish()) + Ok(Operator::from_config(MemoryConfig::default()) + .map_err(from_opendal_error)? + .finish()) } diff --git a/crates/iceberg/src/io/storage/opendal/oss.rs b/crates/storage/opendal/src/oss.rs similarity index 89% rename from crates/iceberg/src/io/storage/opendal/oss.rs rename to crates/storage/opendal/src/oss.rs index 98b68f7cde..9dd7d1ce16 100644 --- a/crates/iceberg/src/io/storage/opendal/oss.rs +++ b/crates/storage/opendal/src/oss.rs @@ -21,8 +21,10 @@ use opendal::services::OssConfig; use opendal::{Configurator, Operator}; use url::Url; -use crate::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; -use crate::{Error, ErrorKind, Result}; +use iceberg::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; +use iceberg::{Error, ErrorKind, Result}; + +use crate::utils::from_opendal_error; /// Parse iceberg props to oss config. pub(crate) fn oss_config_parse(mut m: HashMap) -> Result { @@ -52,5 +54,5 @@ pub(crate) fn oss_config_build(cfg: &OssConfig, path: &str) -> Result let builder = cfg.clone().into_builder().bucket(bucket); - Ok(Operator::new(builder)?.finish()) + Ok(Operator::new(builder).map_err(from_opendal_error)?.finish()) } diff --git a/crates/iceberg/src/io/storage/opendal/s3.rs b/crates/storage/opendal/src/s3.rs similarity index 96% rename from crates/iceberg/src/io/storage/opendal/s3.rs rename to crates/storage/opendal/src/s3.rs index 6b4d64e3a6..0f820803cc 100644 --- a/crates/iceberg/src/io/storage/opendal/s3.rs +++ b/crates/storage/opendal/src/s3.rs @@ -25,13 +25,15 @@ pub use reqsign::{AwsCredential, AwsCredentialLoad}; use reqwest::Client; use url::Url; -use crate::io::{ +use iceberg::io::{ CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN, S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, S3_DISABLE_CONFIG_LOAD, S3_DISABLE_EC2_METADATA, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, S3_SECRET_ACCESS_KEY, - S3_SESSION_TOKEN, S3_SSE_KEY, S3_SSE_MD5, S3_SSE_TYPE, is_truthy, + S3_SESSION_TOKEN, S3_SSE_KEY, S3_SSE_MD5, S3_SSE_TYPE, }; -use crate::{Error, ErrorKind, Result}; +use iceberg::{Error, ErrorKind, Result}; + +use crate::utils::{from_opendal_error, is_truthy}; /// Parse iceberg props to s3 config. pub(crate) fn s3_config_parse(mut m: HashMap) -> Result { @@ -141,7 +143,7 @@ pub(crate) fn s3_config_build( .customized_credential_load(customized_credential_load.clone().into_opendal_loader()); } - Ok(Operator::new(builder)?.finish()) + Ok(Operator::new(builder).map_err(from_opendal_error)?.finish()) } /// Custom AWS credential loader. diff --git a/crates/iceberg/src/io/storage/opendal/mod.rs b/crates/storage/opendal/src/storage.rs similarity index 85% rename from crates/iceberg/src/io/storage/opendal/mod.rs rename to crates/storage/opendal/src/storage.rs index 52d00687e0..26067529d5 100644 --- a/crates/iceberg/src/io/storage/opendal/mod.rs +++ b/crates/storage/opendal/src/storage.rs @@ -17,12 +17,11 @@ //! OpenDAL-based storage implementation. -use std::ops::Range; use std::sync::Arc; use async_trait::async_trait; #[cfg(feature = "storage-azdls")] -use azdls::AzureStorageScheme; +use crate::azdls::AzureStorageScheme; use bytes::Bytes; use opendal::Operator; use opendal::layers::RetryLayer; @@ -34,41 +33,28 @@ use opendal::services::GcsConfig; use opendal::services::OssConfig; #[cfg(feature = "storage-s3")] use opendal::services::S3Config; -#[cfg(feature = "storage-s3")] -pub use s3::CustomAwsCredentialLoader; use serde::{Deserialize, Serialize}; -use crate::io::{ +use iceberg::io::{ FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, StorageFactory, }; -use crate::{Error, ErrorKind, Result}; +use iceberg::{Error, ErrorKind, Result}; -#[cfg(feature = "storage-azdls")] -mod azdls; -#[cfg(feature = "storage-fs")] -mod fs; -#[cfg(feature = "storage-gcs")] -mod gcs; -#[cfg(feature = "storage-memory")] -mod memory; -#[cfg(feature = "storage-oss")] -mod oss; -#[cfg(feature = "storage-s3")] -mod s3; +use crate::utils::from_opendal_error; #[cfg(feature = "storage-azdls")] -use azdls::*; +use crate::azdls::*; #[cfg(feature = "storage-fs")] -use fs::*; +use crate::fs::*; #[cfg(feature = "storage-gcs")] -use gcs::*; +use crate::gcs::*; #[cfg(feature = "storage-memory")] -use memory::*; +use crate::memory::*; #[cfg(feature = "storage-oss")] -use oss::*; +use crate::oss::*; #[cfg(feature = "storage-s3")] -pub use s3::*; +pub use crate::s3::*; /// OpenDAL-based storage factory. /// @@ -90,7 +76,7 @@ pub enum OpenDalStorageFactory { configured_scheme: String, /// Custom AWS credential loader. #[serde(skip)] - customized_credential_load: Option, + customized_credential_load: Option, }, /// GCS storage factory. #[cfg(feature = "storage-gcs")] @@ -182,7 +168,7 @@ pub enum OpenDalStorage { config: Arc, /// Custom AWS credential loader. #[serde(skip)] - customized_credential_load: Option, + customized_credential_load: Option, }, /// GCS storage variant. #[cfg(feature = "storage-gcs")] @@ -217,7 +203,7 @@ impl OpenDalStorage { /// /// # Arguments /// - /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. + /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`](iceberg::io::FileIO). /// /// # Returns /// @@ -327,12 +313,12 @@ impl OpenDalStorage { impl Storage for OpenDalStorage { async fn exists(&self, path: &str) -> Result { let (op, relative_path) = self.create_operator(&path)?; - Ok(op.exists(relative_path).await?) + Ok(op.exists(relative_path).await.map_err(from_opendal_error)?) } async fn metadata(&self, path: &str) -> Result { let (op, relative_path) = self.create_operator(&path)?; - let meta = op.stat(relative_path).await?; + let meta = op.stat(relative_path).await.map_err(from_opendal_error)?; Ok(FileMetadata { size: meta.content_length(), }) @@ -340,28 +326,45 @@ impl Storage for OpenDalStorage { async fn read(&self, path: &str) -> Result { let (op, relative_path) = self.create_operator(&path)?; - Ok(op.read(relative_path).await?.to_bytes()) + Ok(op + .read(relative_path) + .await + .map_err(from_opendal_error)? + .to_bytes()) } async fn reader(&self, path: &str) -> Result> { let (op, relative_path) = self.create_operator(&path)?; - Ok(Box::new(op.reader(relative_path).await?)) + Ok(Box::new(OpenDalReader( + op.reader(relative_path) + .await + .map_err(from_opendal_error)?, + ))) } async fn write(&self, path: &str, bs: Bytes) -> Result<()> { let (op, relative_path) = self.create_operator(&path)?; - op.write(relative_path, bs).await?; + op.write(relative_path, bs) + .await + .map_err(from_opendal_error)?; Ok(()) } async fn writer(&self, path: &str) -> Result> { let (op, relative_path) = self.create_operator(&path)?; - Ok(Box::new(op.writer(relative_path).await?)) + Ok(Box::new(OpenDalWriter( + op.writer(relative_path) + .await + .map_err(from_opendal_error)?, + ))) } async fn delete(&self, path: &str) -> Result<()> { let (op, relative_path) = self.create_operator(&path)?; - Ok(op.delete(relative_path).await?) + Ok(op + .delete(relative_path) + .await + .map_err(from_opendal_error)?) } async fn delete_prefix(&self, path: &str) -> Result<()> { @@ -371,7 +374,7 @@ impl Storage for OpenDalStorage { } else { format!("{relative_path}/") }; - Ok(op.remove_all(&path).await?) + Ok(op.remove_all(&path).await.map_err(from_opendal_error)?) } #[allow(unreachable_code, unused_variables)] @@ -385,23 +388,38 @@ impl Storage for OpenDalStorage { } } -// OpenDAL implementations for FileRead and FileWrite traits +// Newtype wrappers for opendal types to satisfy orphan rules. +// We can't implement iceberg's FileRead/FileWrite traits directly on opendal's +// Reader/Writer since neither trait nor type is defined in this crate. + +/// Wrapper around `opendal::Reader` that implements `FileRead`. +pub(crate) struct OpenDalReader(pub(crate) opendal::Reader); #[async_trait] -impl FileRead for opendal::Reader { - async fn read(&self, range: Range) -> Result { - Ok(opendal::Reader::read(self, range).await?.to_bytes()) +impl FileRead for OpenDalReader { + async fn read(&self, range: std::ops::Range) -> Result { + Ok(opendal::Reader::read(&self.0, range) + .await + .map_err(from_opendal_error)? + .to_bytes()) } } +/// Wrapper around `opendal::Writer` that implements `FileWrite`. +pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer); + #[async_trait] -impl FileWrite for opendal::Writer { +impl FileWrite for OpenDalWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - Ok(opendal::Writer::write(self, bs).await?) + Ok(opendal::Writer::write(&mut self.0, bs) + .await + .map_err(from_opendal_error)?) } async fn close(&mut self) -> Result<()> { - let _ = opendal::Writer::close(self).await?; + let _ = opendal::Writer::close(&mut self.0) + .await + .map_err(from_opendal_error)?; Ok(()) } } diff --git a/crates/storage/opendal/src/utils.rs b/crates/storage/opendal/src/utils.rs new file mode 100644 index 0000000000..b929452e6c --- /dev/null +++ b/crates/storage/opendal/src/utils.rs @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) fn is_truthy(value: &str) -> bool { + ["true", "t", "1", "on"].contains(&value.to_lowercase().as_str()) +} + +/// Convert an opendal error into an iceberg error. +pub(crate) fn from_opendal_error(e: opendal::Error) -> iceberg::Error { + iceberg::Error::new(iceberg::ErrorKind::Unexpected, "Failure in doing io operation") + .with_source(e) +} diff --git a/crates/iceberg/tests/file_io_s3_test.rs b/crates/storage/opendal/tests/file_io_s3_test.rs similarity index 97% rename from crates/iceberg/tests/file_io_s3_test.rs rename to crates/storage/opendal/tests/file_io_s3_test.rs index 1c029cdbc3..5706473721 100644 --- a/crates/iceberg/tests/file_io_s3_test.rs +++ b/crates/storage/opendal/tests/file_io_s3_test.rs @@ -19,15 +19,15 @@ //! //! These tests assume Docker containers are started externally via `make docker-up`. //! Each test uses unique file paths based on module path to avoid conflicts. -#[cfg(all(test, feature = "storage-s3"))] +#[cfg(test)] mod tests { use std::sync::Arc; use async_trait::async_trait; use iceberg::io::{ - CustomAwsCredentialLoader, FileIO, FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, - S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, + FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, }; + use iceberg_storage_opendal::{CustomAwsCredentialLoader, OpenDalStorageFactory}; use iceberg_test_utils::{get_minio_endpoint, normalize_test_name_with_parts, set_up}; use reqsign::{AwsCredential, AwsCredentialLoad}; use reqwest::Client; From 3f8029aa612df2d04f9a3dd5e7eab66745aa9a5c Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 4 Mar 2026 15:02:28 -0800 Subject: [PATCH 2/9] minor --- crates/iceberg/src/catalog/memory/catalog.rs | 12 ++++-------- .../opendal}/tests/file_io_gcs_test.rs | 7 +++---- crates/storage/opendal/tests/file_io_s3_test.rs | 2 +- 3 files changed, 8 insertions(+), 13 deletions(-) rename crates/{iceberg => storage/opendal}/tests/file_io_gcs_test.rs (95%) diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 32ed8ade67..e008de8050 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -25,7 +25,7 @@ use futures::lock::{Mutex, MutexGuard}; use itertools::Itertools; use super::namespace_state::NamespaceState; -use crate::io::{FileIO, FileIOBuilder, StorageFactory}; +use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory}; use crate::spec::{TableMetadata, TableMetadataBuilder}; use crate::table::Table; use crate::{ @@ -128,16 +128,12 @@ impl MemoryCatalog { config: MemoryCatalogConfig, storage_factory: Option>, ) -> Result { - let file_io = match storage_factory { - Some(factory) => FileIOBuilder::new(factory) - .with_props(config.props) - .build(), - None => FileIO::new_with_memory(), - }; + // Use provided factory or default to MemoryStorageFactory + let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory)); Ok(Self { root_namespace_state: Mutex::new(NamespaceState::default()), - file_io, + file_io: FileIOBuilder::new(factory).with_props(config.props).build(), warehouse_location: config.warehouse, }) } diff --git a/crates/iceberg/tests/file_io_gcs_test.rs b/crates/storage/opendal/tests/file_io_gcs_test.rs similarity index 95% rename from crates/iceberg/tests/file_io_gcs_test.rs rename to crates/storage/opendal/tests/file_io_gcs_test.rs index 75bc9fae12..158c4413d6 100644 --- a/crates/iceberg/tests/file_io_gcs_test.rs +++ b/crates/storage/opendal/tests/file_io_gcs_test.rs @@ -19,15 +19,14 @@ //! //! These tests assume Docker containers are started externally via `make docker-up`. -#[cfg(all(test, feature = "storage-gcs"))] +#[cfg(feature = "storage-gcs")] mod tests { use std::collections::HashMap; use std::sync::Arc; use bytes::Bytes; - use iceberg::io::{ - FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH, OpenDalStorageFactory, - }; + use iceberg::io::{FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH}; + use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg_test_utils::{get_gcs_endpoint, set_up}; static FAKE_GCS_BUCKET: &str = "test-bucket"; diff --git a/crates/storage/opendal/tests/file_io_s3_test.rs b/crates/storage/opendal/tests/file_io_s3_test.rs index 5706473721..a22c96e0a2 100644 --- a/crates/storage/opendal/tests/file_io_s3_test.rs +++ b/crates/storage/opendal/tests/file_io_s3_test.rs @@ -19,7 +19,7 @@ //! //! These tests assume Docker containers are started externally via `make docker-up`. //! Each test uses unique file paths based on module path to avoid conflicts. -#[cfg(test)] +#[cfg(feature = "storage-s3")] mod tests { use std::sync::Arc; From 6e31b1a5763b105c1f5b1832e16221fcbca051e8 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 4 Mar 2026 15:14:06 -0800 Subject: [PATCH 3/9] clean up --- crates/catalog/glue/src/catalog.rs | 2 +- .../catalog/glue/tests/glue_catalog_test.rs | 2 +- crates/catalog/hms/tests/hms_catalog_test.rs | 2 +- crates/catalog/s3tables/src/catalog.rs | 2 +- crates/examples/src/oss_backend.rs | 2 +- crates/iceberg/src/io/storage/mod.rs | 8 - crates/integration_tests/tests/common/mod.rs | 2 +- .../tests/conflict_commit_test.rs | 2 +- .../tests/read_evolved_schema.rs | 2 +- .../tests/read_positional_deletes.rs | 2 +- crates/storage/opendal/src/azdls.rs | 13 +- crates/storage/opendal/src/fs.rs | 2 +- crates/storage/opendal/src/gcs.rs | 11 +- crates/storage/opendal/src/lib.rs | 409 +++++++++++++++- crates/storage/opendal/src/memory.rs | 2 +- crates/storage/opendal/src/oss.rs | 5 +- crates/storage/opendal/src/s3.rs | 11 +- crates/storage/opendal/src/storage.rs | 437 ------------------ crates/storage/opendal/src/utils.rs | 7 +- 19 files changed, 443 insertions(+), 480 deletions(-) delete mode 100644 crates/storage/opendal/src/storage.rs diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 31d9293ef4..bf2f392330 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -28,13 +28,13 @@ use iceberg::io::{ FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, StorageFactory, }; -use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; +use iceberg_storage_opendal::OpenDalStorageFactory; use crate::error::{from_aws_build_error, from_aws_sdk_error}; use crate::utils::{ diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs b/crates/catalog/glue/tests/glue_catalog_test.rs index b29da8fe7d..0b7dbe9f23 100644 --- a/crates/catalog/glue/tests/glue_catalog_test.rs +++ b/crates/catalog/glue/tests/glue_catalog_test.rs @@ -24,7 +24,6 @@ use std::collections::HashMap; use std::sync::Arc; use iceberg::io::{FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; -use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::{ @@ -34,6 +33,7 @@ use iceberg_catalog_glue::{ AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, GLUE_CATALOG_PROP_URI, GLUE_CATALOG_PROP_WAREHOUSE, GlueCatalog, GlueCatalogBuilder, }; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg_test_utils::{ cleanup_namespace, get_glue_endpoint, get_minio_endpoint, normalize_test_name_with_parts, set_up, diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index 16b99e7e52..74c9e52e92 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -24,13 +24,13 @@ use std::collections::HashMap; use std::sync::Arc; use iceberg::io::{FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; -use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; use iceberg_catalog_hms::{ HMS_CATALOG_PROP_THRIFT_TRANSPORT, HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE, HmsCatalog, HmsCatalogBuilder, THRIFT_TRANSPORT_BUFFERED, }; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg_test_utils::{ cleanup_namespace, get_hms_endpoint, get_minio_endpoint, normalize_test_name_with_parts, set_up, }; diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 54cd1efc0b..afe28ae453 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -27,13 +27,13 @@ use aws_sdk_s3tables::operation::list_tables::ListTablesOutput; use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError; use aws_sdk_s3tables::types::OpenTableFormat; use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; -use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; +use iceberg_storage_opendal::OpenDalStorageFactory; use crate::utils::create_sdk_config; diff --git a/crates/examples/src/oss_backend.rs b/crates/examples/src/oss_backend.rs index 0cf833f5ac..2d56cceb7f 100644 --- a/crates/examples/src/oss_backend.rs +++ b/crates/examples/src/oss_backend.rs @@ -19,9 +19,9 @@ use std::collections::HashMap; use std::sync::Arc; use futures::stream::StreamExt; -use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableIdent}; use iceberg_catalog_rest::{REST_CATALOG_PROP_URI, RestCatalogBuilder}; +use iceberg_storage_opendal::OpenDalStorageFactory; // Configure these values according to your environment diff --git a/crates/iceberg/src/io/storage/mod.rs b/crates/iceberg/src/io/storage/mod.rs index 2b4f9311b1..28ccf12fa9 100644 --- a/crates/iceberg/src/io/storage/mod.rs +++ b/crates/iceberg/src/io/storage/mod.rs @@ -66,10 +66,6 @@ use crate::Result; /// // ... implement other methods /// } /// -/// TODO remove below when the trait is integrated with FileIO and Catalog -/// # NOTE -/// This trait is under heavy development and is not used anywhere as of now -/// Please DO NOT implement it /// ``` #[async_trait] #[typetag::serde(tag = "type")] @@ -127,10 +123,6 @@ pub trait Storage: Debug + Send + Sync { /// } /// } /// -/// TODO remove below when the trait is integrated with FileIO and Catalog -/// # NOTE -/// This trait is under heavy development and is not used anywhere as of now -/// Please DO NOT implement it /// ``` #[typetag::serde(tag = "type")] pub trait StorageFactory: Debug + Send + Sync { diff --git a/crates/integration_tests/tests/common/mod.rs b/crates/integration_tests/tests/common/mod.rs index 8122baee01..e49a57465c 100644 --- a/crates/integration_tests/tests/common/mod.rs +++ b/crates/integration_tests/tests/common/mod.rs @@ -18,11 +18,11 @@ use std::collections::HashMap; use std::sync::Arc; -use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent}; use iceberg_catalog_rest::RestCatalogBuilder; use iceberg_integration_tests::get_test_fixture; +use iceberg_storage_opendal::OpenDalStorageFactory; pub async fn random_ns() -> Namespace { let fixture = get_test_fixture(); diff --git a/crates/integration_tests/tests/conflict_commit_test.rs b/crates/integration_tests/tests/conflict_commit_test.rs index 268e94aaf2..3b1362b95d 100644 --- a/crates/integration_tests/tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/conflict_commit_test.rs @@ -24,7 +24,6 @@ use std::sync::Arc; use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use common::{random_ns, test_schema}; use futures::TryStreamExt; -use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; @@ -36,6 +35,7 @@ use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, CatalogBuilder, TableCreation}; use iceberg_catalog_rest::RestCatalogBuilder; use iceberg_integration_tests::get_test_fixture; +use iceberg_storage_opendal::OpenDalStorageFactory; use parquet::file::properties::WriterProperties; #[tokio::test] diff --git a/crates/integration_tests/tests/read_evolved_schema.rs b/crates/integration_tests/tests/read_evolved_schema.rs index 3efc071ae0..ae25a08987 100644 --- a/crates/integration_tests/tests/read_evolved_schema.rs +++ b/crates/integration_tests/tests/read_evolved_schema.rs @@ -22,11 +22,11 @@ use std::sync::Arc; use arrow_array::{Decimal128Array, Float64Array, Int64Array, StringArray}; use futures::TryStreamExt; use iceberg::expr::Reference; -use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::spec::Datum; use iceberg::{Catalog, CatalogBuilder, TableIdent}; use iceberg_catalog_rest::RestCatalogBuilder; use iceberg_integration_tests::get_test_fixture; +use iceberg_storage_opendal::OpenDalStorageFactory; use ordered_float::OrderedFloat; #[tokio::test] diff --git a/crates/integration_tests/tests/read_positional_deletes.rs b/crates/integration_tests/tests/read_positional_deletes.rs index 54da6b5bd4..d4c4afeaf3 100644 --- a/crates/integration_tests/tests/read_positional_deletes.rs +++ b/crates/integration_tests/tests/read_positional_deletes.rs @@ -20,10 +20,10 @@ use std::sync::Arc; use futures::TryStreamExt; -use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::{Catalog, CatalogBuilder, TableIdent}; use iceberg_catalog_rest::RestCatalogBuilder; use iceberg_integration_tests::get_test_fixture; +use iceberg_storage_opendal::OpenDalStorageFactory; #[tokio::test] async fn test_read_table_with_positional_deletes() { diff --git a/crates/storage/opendal/src/azdls.rs b/crates/storage/opendal/src/azdls.rs index c27a9aef44..bda07500d0 100644 --- a/crates/storage/opendal/src/azdls.rs +++ b/crates/storage/opendal/src/azdls.rs @@ -19,16 +19,15 @@ use std::collections::HashMap; use std::fmt::Display; use std::str::FromStr; -use opendal::Configurator; -use opendal::services::AzdlsConfig; -use serde::{Deserialize, Serialize}; -use url::Url; - use iceberg::io::{ ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, ADLS_AUTHORITY_HOST, ADLS_CLIENT_ID, ADLS_CLIENT_SECRET, ADLS_CONNECTION_STRING, ADLS_SAS_TOKEN, ADLS_TENANT_ID, }; use iceberg::{Error, ErrorKind, Result}; +use opendal::Configurator; +use opendal::services::AzdlsConfig; +use serde::{Deserialize, Serialize}; +use url::Url; use crate::utils::from_opendal_error; @@ -213,7 +212,9 @@ fn azdls_config_build(config: &AzdlsConfig, path: &AzureStoragePath) -> Result Result { diff --git a/crates/storage/opendal/src/gcs.rs b/crates/storage/opendal/src/gcs.rs index 9c2cbe9dba..7d4738c902 100644 --- a/crates/storage/opendal/src/gcs.rs +++ b/crates/storage/opendal/src/gcs.rs @@ -18,15 +18,14 @@ use std::collections::HashMap; -use opendal::Operator; -use opendal::services::GcsConfig; -use url::Url; - use iceberg::io::{ GCS_ALLOW_ANONYMOUS, GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, GCS_DISABLE_VM_METADATA, GCS_NO_AUTH, GCS_SERVICE_PATH, GCS_TOKEN, }; use iceberg::{Error, ErrorKind, Result}; +use opendal::Operator; +use opendal::services::GcsConfig; +use url::Url; use crate::utils::{from_opendal_error, is_truthy}; @@ -83,5 +82,7 @@ pub(crate) fn gcs_config_build(cfg: &GcsConfig, path: &str) -> Result let mut cfg = cfg.clone(); cfg.bucket = bucket.to_string(); - Ok(Operator::from_config(cfg).map_err(from_opendal_error)?.finish()) + Ok(Operator::from_config(cfg) + .map_err(from_opendal_error)? + .finish()) } diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs index 85f674eed8..3aeab768cf 100644 --- a/crates/storage/opendal/src/lib.rs +++ b/crates/storage/opendal/src/lib.rs @@ -22,7 +22,6 @@ //! [`StorageFactory`](iceberg::io::StorageFactory) traits from the `iceberg` crate //! using [OpenDAL](https://opendal.apache.org/) as the backend. -mod storage; mod utils; #[cfg(feature = "storage-azdls")] @@ -38,7 +37,413 @@ mod oss; #[cfg(feature = "storage-s3")] mod s3; -pub use storage::{OpenDalStorage, OpenDalStorageFactory}; +use std::sync::Arc; +use async_trait::async_trait; +#[cfg(feature = "storage-azdls")] +use azdls::AzureStorageScheme; +#[cfg(feature = "storage-azdls")] +use azdls::*; +use bytes::Bytes; +#[cfg(feature = "storage-fs")] +use fs::*; +#[cfg(feature = "storage-gcs")] +use gcs::*; +use iceberg::io::{ + FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, + StorageFactory, +}; +use iceberg::{Error, ErrorKind, Result}; +#[cfg(feature = "storage-memory")] +use memory::*; +use opendal::Operator; +use opendal::layers::RetryLayer; +#[cfg(feature = "storage-azdls")] +use opendal::services::AzdlsConfig; +#[cfg(feature = "storage-gcs")] +use opendal::services::GcsConfig; +#[cfg(feature = "storage-oss")] +use opendal::services::OssConfig; +#[cfg(feature = "storage-s3")] +use opendal::services::S3Config; +#[cfg(feature = "storage-oss")] +use oss::*; #[cfg(feature = "storage-s3")] pub use s3::CustomAwsCredentialLoader; +#[cfg(feature = "storage-s3")] +pub use s3::*; +use serde::{Deserialize, Serialize}; +use utils::from_opendal_error; + +/// OpenDAL-based storage factory. +/// +/// Maps scheme to the corresponding OpenDalStorage storage variant. +/// Use this factory with `FileIOBuilder::new(factory)` to create FileIO instances. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum OpenDalStorageFactory { + /// Memory storage factory. + #[cfg(feature = "storage-memory")] + Memory, + /// Local filesystem storage factory. + #[cfg(feature = "storage-fs")] + Fs, + /// S3 storage factory. + #[cfg(feature = "storage-s3")] + S3 { + /// s3 storage could have `s3://` and `s3a://`. + /// Storing the scheme string here to return the correct path. + configured_scheme: String, + /// Custom AWS credential loader. + #[serde(skip)] + customized_credential_load: Option, + }, + /// GCS storage factory. + #[cfg(feature = "storage-gcs")] + Gcs, + /// OSS storage factory. + #[cfg(feature = "storage-oss")] + Oss, + /// Azure Data Lake Storage factory. + #[cfg(feature = "storage-azdls")] + Azdls { + /// The configured Azure storage scheme. + configured_scheme: AzureStorageScheme, + }, +} + +#[typetag::serde(name = "OpenDalStorageFactory")] +impl StorageFactory for OpenDalStorageFactory { + #[allow(unused_variables)] + fn build(&self, config: &StorageConfig) -> Result> { + match self { + #[cfg(feature = "storage-memory")] + OpenDalStorageFactory::Memory => { + Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?))) + } + #[cfg(feature = "storage-fs")] + OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)), + #[cfg(feature = "storage-s3")] + OpenDalStorageFactory::S3 { + configured_scheme, + customized_credential_load, + } => Ok(Arc::new(OpenDalStorage::S3 { + configured_scheme: configured_scheme.clone(), + config: s3_config_parse(config.props().clone())?.into(), + customized_credential_load: customized_credential_load.clone(), + })), + #[cfg(feature = "storage-gcs")] + OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs { + config: gcs_config_parse(config.props().clone())?.into(), + })), + #[cfg(feature = "storage-oss")] + OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss { + config: oss_config_parse(config.props().clone())?.into(), + })), + #[cfg(feature = "storage-azdls")] + OpenDalStorageFactory::Azdls { configured_scheme } => { + Ok(Arc::new(OpenDalStorage::Azdls { + configured_scheme: configured_scheme.clone(), + config: azdls_config_parse(config.props().clone())?.into(), + })) + } + #[cfg(all( + not(feature = "storage-memory"), + not(feature = "storage-fs"), + not(feature = "storage-s3"), + not(feature = "storage-gcs"), + not(feature = "storage-oss"), + not(feature = "storage-azdls"), + ))] + _ => Err(Error::new( + ErrorKind::FeatureUnsupported, + "No storage service has been enabled", + )), + } + } +} + +/// Default memory operator for serde deserialization. +#[cfg(feature = "storage-memory")] +fn default_memory_operator() -> Operator { + memory_config_build().expect("Failed to create default memory operator") +} + +/// OpenDAL-based storage implementation. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum OpenDalStorage { + /// Memory storage variant. + #[cfg(feature = "storage-memory")] + Memory(#[serde(skip, default = "self::default_memory_operator")] Operator), + /// Local filesystem storage variant. + #[cfg(feature = "storage-fs")] + LocalFs, + /// S3 storage variant. + #[cfg(feature = "storage-s3")] + S3 { + /// s3 storage could have `s3://` and `s3a://`. + /// Storing the scheme string here to return the correct path. + configured_scheme: String, + /// S3 configuration. + config: Arc, + /// Custom AWS credential loader. + #[serde(skip)] + customized_credential_load: Option, + }, + /// GCS storage variant. + #[cfg(feature = "storage-gcs")] + Gcs { + /// GCS configuration. + config: Arc, + }, + /// OSS storage variant. + #[cfg(feature = "storage-oss")] + Oss { + /// OSS configuration. + config: Arc, + }, + /// Azure Data Lake Storage variant. + /// Expects paths of the form + /// `abfs[s]://@.dfs./` or + /// `wasb[s]://@.blob./`. + #[cfg(feature = "storage-azdls")] + #[allow(private_interfaces)] + Azdls { + /// The configured Azure storage scheme. + /// Because Azdls accepts multiple possible schemes, we store the full + /// passed scheme here to later validate schemes passed via paths. + configured_scheme: AzureStorageScheme, + /// Azure DLS configuration. + config: Arc, + }, +} + +impl OpenDalStorage { + /// Creates operator from path. + /// + /// # Arguments + /// + /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`](iceberg::io::FileIO). + /// + /// # Returns + /// + /// The return value consists of two parts: + /// + /// * An [`opendal::Operator`] instance used to operate on file. + /// * Relative path to the root uri of [`opendal::Operator`]. + #[allow(unreachable_code, unused_variables)] + pub(crate) fn create_operator<'a>( + &self, + path: &'a impl AsRef, + ) -> Result<(Operator, &'a str)> { + let path = path.as_ref(); + let (operator, relative_path): (Operator, &str) = match self { + #[cfg(feature = "storage-memory")] + OpenDalStorage::Memory(op) => { + if let Some(stripped) = path.strip_prefix("memory:/") { + (op.clone(), stripped) + } else { + (op.clone(), &path[1..]) + } + } + #[cfg(feature = "storage-fs")] + OpenDalStorage::LocalFs => { + let op = fs_config_build()?; + if let Some(stripped) = path.strip_prefix("file:/") { + (op, stripped) + } else { + (op, &path[1..]) + } + } + #[cfg(feature = "storage-s3")] + OpenDalStorage::S3 { + configured_scheme, + config, + customized_credential_load, + } => { + let op = s3_config_build(config, customized_credential_load, path)?; + let op_info = op.info(); + + // Check prefix of s3 path. + let prefix = format!("{}://{}/", configured_scheme, op_info.name()); + if path.starts_with(&prefix) { + (op, &path[prefix.len()..]) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid s3 url: {path}, should start with {prefix}"), + )); + } + } + #[cfg(feature = "storage-gcs")] + OpenDalStorage::Gcs { config } => { + let operator = gcs_config_build(config, path)?; + let prefix = format!("gs://{}/", operator.info().name()); + if path.starts_with(&prefix) { + (operator, &path[prefix.len()..]) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid gcs url: {path}, should start with {prefix}"), + )); + } + } + #[cfg(feature = "storage-oss")] + OpenDalStorage::Oss { config } => { + let op = oss_config_build(config, path)?; + let prefix = format!("oss://{}/", op.info().name()); + if path.starts_with(&prefix) { + (op, &path[prefix.len()..]) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Invalid oss url: {path}, should start with {prefix}"), + )); + } + } + #[cfg(feature = "storage-azdls")] + OpenDalStorage::Azdls { + configured_scheme, + config, + } => azdls_create_operator(path, config, configured_scheme)?, + #[cfg(all( + not(feature = "storage-s3"), + not(feature = "storage-fs"), + not(feature = "storage-gcs"), + not(feature = "storage-oss"), + not(feature = "storage-azdls"), + ))] + _ => { + return Err(Error::new( + ErrorKind::FeatureUnsupported, + "No storage service has been enabled", + )); + } + }; + + // Transient errors are common for object stores; however there's no + // harm in retrying temporary failures for other storage backends as well. + let operator = operator.layer(RetryLayer::new()); + Ok((operator, relative_path)) + } +} + +#[typetag::serde(name = "OpenDalStorage")] +#[async_trait] +impl Storage for OpenDalStorage { + async fn exists(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(&path)?; + Ok(op.exists(relative_path).await.map_err(from_opendal_error)?) + } + + async fn metadata(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(&path)?; + let meta = op.stat(relative_path).await.map_err(from_opendal_error)?; + Ok(FileMetadata { + size: meta.content_length(), + }) + } + + async fn read(&self, path: &str) -> Result { + let (op, relative_path) = self.create_operator(&path)?; + Ok(op + .read(relative_path) + .await + .map_err(from_opendal_error)? + .to_bytes()) + } + + async fn reader(&self, path: &str) -> Result> { + let (op, relative_path) = self.create_operator(&path)?; + Ok(Box::new(OpenDalReader( + op.reader(relative_path).await.map_err(from_opendal_error)?, + ))) + } + + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { + let (op, relative_path) = self.create_operator(&path)?; + op.write(relative_path, bs) + .await + .map_err(from_opendal_error)?; + Ok(()) + } + + async fn writer(&self, path: &str) -> Result> { + let (op, relative_path) = self.create_operator(&path)?; + Ok(Box::new(OpenDalWriter( + op.writer(relative_path).await.map_err(from_opendal_error)?, + ))) + } + + async fn delete(&self, path: &str) -> Result<()> { + let (op, relative_path) = self.create_operator(&path)?; + Ok(op.delete(relative_path).await.map_err(from_opendal_error)?) + } + + async fn delete_prefix(&self, path: &str) -> Result<()> { + let (op, relative_path) = self.create_operator(&path)?; + let path = if relative_path.ends_with('/') { + relative_path.to_string() + } else { + format!("{relative_path}/") + }; + Ok(op.remove_all(&path).await.map_err(from_opendal_error)?) + } + + #[allow(unreachable_code, unused_variables)] + fn new_input(&self, path: &str) -> Result { + Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) + } + + #[allow(unreachable_code, unused_variables)] + fn new_output(&self, path: &str) -> Result { + Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) + } +} + +// Newtype wrappers for opendal types to satisfy orphan rules. +// We can't implement iceberg's FileRead/FileWrite traits directly on opendal's +// Reader/Writer since neither trait nor type is defined in this crate. + +/// Wrapper around `opendal::Reader` that implements `FileRead`. +pub(crate) struct OpenDalReader(pub(crate) opendal::Reader); + +#[async_trait] +impl FileRead for OpenDalReader { + async fn read(&self, range: std::ops::Range) -> Result { + Ok(opendal::Reader::read(&self.0, range) + .await + .map_err(from_opendal_error)? + .to_bytes()) + } +} + +/// Wrapper around `opendal::Writer` that implements `FileWrite`. +pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer); + +#[async_trait] +impl FileWrite for OpenDalWriter { + async fn write(&mut self, bs: Bytes) -> Result<()> { + Ok(opendal::Writer::write(&mut self.0, bs) + .await + .map_err(from_opendal_error)?) + } + + async fn close(&mut self) -> Result<()> { + let _ = opendal::Writer::close(&mut self.0) + .await + .map_err(from_opendal_error)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[cfg(feature = "storage-memory")] + #[test] + fn test_default_memory_operator() { + let op = default_memory_operator(); + assert_eq!(op.info().scheme().to_string(), "memory"); + } +} diff --git a/crates/storage/opendal/src/memory.rs b/crates/storage/opendal/src/memory.rs index 3b1431091c..5957d0e848 100644 --- a/crates/storage/opendal/src/memory.rs +++ b/crates/storage/opendal/src/memory.rs @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. +use iceberg::Result; use opendal::Operator; use opendal::services::MemoryConfig; use crate::utils::from_opendal_error; -use iceberg::Result; pub(crate) fn memory_config_build() -> Result { Ok(Operator::from_config(MemoryConfig::default()) diff --git a/crates/storage/opendal/src/oss.rs b/crates/storage/opendal/src/oss.rs index 9dd7d1ce16..add8b7a0f7 100644 --- a/crates/storage/opendal/src/oss.rs +++ b/crates/storage/opendal/src/oss.rs @@ -17,13 +17,12 @@ use std::collections::HashMap; +use iceberg::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; +use iceberg::{Error, ErrorKind, Result}; use opendal::services::OssConfig; use opendal::{Configurator, Operator}; use url::Url; -use iceberg::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; -use iceberg::{Error, ErrorKind, Result}; - use crate::utils::from_opendal_error; /// Parse iceberg props to oss config. diff --git a/crates/storage/opendal/src/s3.rs b/crates/storage/opendal/src/s3.rs index 0f820803cc..7db88d273f 100644 --- a/crates/storage/opendal/src/s3.rs +++ b/crates/storage/opendal/src/s3.rs @@ -19,12 +19,6 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; -use opendal::services::S3Config; -use opendal::{Configurator, Operator}; -pub use reqsign::{AwsCredential, AwsCredentialLoad}; -use reqwest::Client; -use url::Url; - use iceberg::io::{ CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN, S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, S3_DISABLE_CONFIG_LOAD, @@ -32,6 +26,11 @@ use iceberg::io::{ S3_SESSION_TOKEN, S3_SSE_KEY, S3_SSE_MD5, S3_SSE_TYPE, }; use iceberg::{Error, ErrorKind, Result}; +use opendal::services::S3Config; +use opendal::{Configurator, Operator}; +pub use reqsign::{AwsCredential, AwsCredentialLoad}; +use reqwest::Client; +use url::Url; use crate::utils::{from_opendal_error, is_truthy}; diff --git a/crates/storage/opendal/src/storage.rs b/crates/storage/opendal/src/storage.rs deleted file mode 100644 index 26067529d5..0000000000 --- a/crates/storage/opendal/src/storage.rs +++ /dev/null @@ -1,437 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! OpenDAL-based storage implementation. - -use std::sync::Arc; - -use async_trait::async_trait; -#[cfg(feature = "storage-azdls")] -use crate::azdls::AzureStorageScheme; -use bytes::Bytes; -use opendal::Operator; -use opendal::layers::RetryLayer; -#[cfg(feature = "storage-azdls")] -use opendal::services::AzdlsConfig; -#[cfg(feature = "storage-gcs")] -use opendal::services::GcsConfig; -#[cfg(feature = "storage-oss")] -use opendal::services::OssConfig; -#[cfg(feature = "storage-s3")] -use opendal::services::S3Config; -use serde::{Deserialize, Serialize}; - -use iceberg::io::{ - FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, - StorageFactory, -}; -use iceberg::{Error, ErrorKind, Result}; - -use crate::utils::from_opendal_error; - -#[cfg(feature = "storage-azdls")] -use crate::azdls::*; -#[cfg(feature = "storage-fs")] -use crate::fs::*; -#[cfg(feature = "storage-gcs")] -use crate::gcs::*; -#[cfg(feature = "storage-memory")] -use crate::memory::*; -#[cfg(feature = "storage-oss")] -use crate::oss::*; -#[cfg(feature = "storage-s3")] -pub use crate::s3::*; - -/// OpenDAL-based storage factory. -/// -/// Maps scheme to the corresponding OpenDalStorage storage variant. -/// Use this factory with `FileIOBuilder::new(factory)` to create FileIO instances. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum OpenDalStorageFactory { - /// Memory storage factory. - #[cfg(feature = "storage-memory")] - Memory, - /// Local filesystem storage factory. - #[cfg(feature = "storage-fs")] - Fs, - /// S3 storage factory. - #[cfg(feature = "storage-s3")] - S3 { - /// s3 storage could have `s3://` and `s3a://`. - /// Storing the scheme string here to return the correct path. - configured_scheme: String, - /// Custom AWS credential loader. - #[serde(skip)] - customized_credential_load: Option, - }, - /// GCS storage factory. - #[cfg(feature = "storage-gcs")] - Gcs, - /// OSS storage factory. - #[cfg(feature = "storage-oss")] - Oss, - /// Azure Data Lake Storage factory. - #[cfg(feature = "storage-azdls")] - Azdls { - /// The configured Azure storage scheme. - configured_scheme: AzureStorageScheme, - }, -} - -#[typetag::serde(name = "OpenDalStorageFactory")] -impl StorageFactory for OpenDalStorageFactory { - #[allow(unused_variables)] - fn build(&self, config: &StorageConfig) -> Result> { - match self { - #[cfg(feature = "storage-memory")] - OpenDalStorageFactory::Memory => { - Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?))) - } - #[cfg(feature = "storage-fs")] - OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)), - #[cfg(feature = "storage-s3")] - OpenDalStorageFactory::S3 { - configured_scheme, - customized_credential_load, - } => Ok(Arc::new(OpenDalStorage::S3 { - configured_scheme: configured_scheme.clone(), - config: s3_config_parse(config.props().clone())?.into(), - customized_credential_load: customized_credential_load.clone(), - })), - #[cfg(feature = "storage-gcs")] - OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs { - config: gcs_config_parse(config.props().clone())?.into(), - })), - #[cfg(feature = "storage-oss")] - OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss { - config: oss_config_parse(config.props().clone())?.into(), - })), - #[cfg(feature = "storage-azdls")] - OpenDalStorageFactory::Azdls { configured_scheme } => { - Ok(Arc::new(OpenDalStorage::Azdls { - configured_scheme: configured_scheme.clone(), - config: azdls_config_parse(config.props().clone())?.into(), - })) - } - #[cfg(all( - not(feature = "storage-memory"), - not(feature = "storage-fs"), - not(feature = "storage-s3"), - not(feature = "storage-gcs"), - not(feature = "storage-oss"), - not(feature = "storage-azdls"), - ))] - _ => Err(Error::new( - ErrorKind::FeatureUnsupported, - "No storage service has been enabled", - )), - } - } -} - -/// Default memory operator for serde deserialization. -#[cfg(feature = "storage-memory")] -fn default_memory_operator() -> Operator { - memory_config_build().expect("Failed to create default memory operator") -} - -/// OpenDAL-based storage implementation. -#[derive(Clone, Debug, Serialize, Deserialize)] -pub enum OpenDalStorage { - /// Memory storage variant. - #[cfg(feature = "storage-memory")] - Memory(#[serde(skip, default = "self::default_memory_operator")] Operator), - /// Local filesystem storage variant. - #[cfg(feature = "storage-fs")] - LocalFs, - /// S3 storage variant. - #[cfg(feature = "storage-s3")] - S3 { - /// s3 storage could have `s3://` and `s3a://`. - /// Storing the scheme string here to return the correct path. - configured_scheme: String, - /// S3 configuration. - config: Arc, - /// Custom AWS credential loader. - #[serde(skip)] - customized_credential_load: Option, - }, - /// GCS storage variant. - #[cfg(feature = "storage-gcs")] - Gcs { - /// GCS configuration. - config: Arc, - }, - /// OSS storage variant. - #[cfg(feature = "storage-oss")] - Oss { - /// OSS configuration. - config: Arc, - }, - /// Azure Data Lake Storage variant. - /// Expects paths of the form - /// `abfs[s]://@.dfs./` or - /// `wasb[s]://@.blob./`. - #[cfg(feature = "storage-azdls")] - #[allow(private_interfaces)] - Azdls { - /// The configured Azure storage scheme. - /// Because Azdls accepts multiple possible schemes, we store the full - /// passed scheme here to later validate schemes passed via paths. - configured_scheme: AzureStorageScheme, - /// Azure DLS configuration. - config: Arc, - }, -} - -impl OpenDalStorage { - /// Creates operator from path. - /// - /// # Arguments - /// - /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`](iceberg::io::FileIO). - /// - /// # Returns - /// - /// The return value consists of two parts: - /// - /// * An [`opendal::Operator`] instance used to operate on file. - /// * Relative path to the root uri of [`opendal::Operator`]. - #[allow(unreachable_code, unused_variables)] - pub(crate) fn create_operator<'a>( - &self, - path: &'a impl AsRef, - ) -> Result<(Operator, &'a str)> { - let path = path.as_ref(); - let (operator, relative_path): (Operator, &str) = match self { - #[cfg(feature = "storage-memory")] - OpenDalStorage::Memory(op) => { - if let Some(stripped) = path.strip_prefix("memory:/") { - (op.clone(), stripped) - } else { - (op.clone(), &path[1..]) - } - } - #[cfg(feature = "storage-fs")] - OpenDalStorage::LocalFs => { - let op = fs_config_build()?; - if let Some(stripped) = path.strip_prefix("file:/") { - (op, stripped) - } else { - (op, &path[1..]) - } - } - #[cfg(feature = "storage-s3")] - OpenDalStorage::S3 { - configured_scheme, - config, - customized_credential_load, - } => { - let op = s3_config_build(config, customized_credential_load, path)?; - let op_info = op.info(); - - // Check prefix of s3 path. - let prefix = format!("{}://{}/", configured_scheme, op_info.name()); - if path.starts_with(&prefix) { - (op, &path[prefix.len()..]) - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid s3 url: {path}, should start with {prefix}"), - )); - } - } - #[cfg(feature = "storage-gcs")] - OpenDalStorage::Gcs { config } => { - let operator = gcs_config_build(config, path)?; - let prefix = format!("gs://{}/", operator.info().name()); - if path.starts_with(&prefix) { - (operator, &path[prefix.len()..]) - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid gcs url: {path}, should start with {prefix}"), - )); - } - } - #[cfg(feature = "storage-oss")] - OpenDalStorage::Oss { config } => { - let op = oss_config_build(config, path)?; - let prefix = format!("oss://{}/", op.info().name()); - if path.starts_with(&prefix) { - (op, &path[prefix.len()..]) - } else { - return Err(Error::new( - ErrorKind::DataInvalid, - format!("Invalid oss url: {path}, should start with {prefix}"), - )); - } - } - #[cfg(feature = "storage-azdls")] - OpenDalStorage::Azdls { - configured_scheme, - config, - } => azdls_create_operator(path, config, configured_scheme)?, - #[cfg(all( - not(feature = "storage-s3"), - not(feature = "storage-fs"), - not(feature = "storage-gcs"), - not(feature = "storage-oss"), - not(feature = "storage-azdls"), - ))] - _ => { - return Err(Error::new( - ErrorKind::FeatureUnsupported, - "No storage service has been enabled", - )); - } - }; - - // Transient errors are common for object stores; however there's no - // harm in retrying temporary failures for other storage backends as well. - let operator = operator.layer(RetryLayer::new()); - Ok((operator, relative_path)) - } -} - -#[typetag::serde(name = "OpenDalStorage")] -#[async_trait] -impl Storage for OpenDalStorage { - async fn exists(&self, path: &str) -> Result { - let (op, relative_path) = self.create_operator(&path)?; - Ok(op.exists(relative_path).await.map_err(from_opendal_error)?) - } - - async fn metadata(&self, path: &str) -> Result { - let (op, relative_path) = self.create_operator(&path)?; - let meta = op.stat(relative_path).await.map_err(from_opendal_error)?; - Ok(FileMetadata { - size: meta.content_length(), - }) - } - - async fn read(&self, path: &str) -> Result { - let (op, relative_path) = self.create_operator(&path)?; - Ok(op - .read(relative_path) - .await - .map_err(from_opendal_error)? - .to_bytes()) - } - - async fn reader(&self, path: &str) -> Result> { - let (op, relative_path) = self.create_operator(&path)?; - Ok(Box::new(OpenDalReader( - op.reader(relative_path) - .await - .map_err(from_opendal_error)?, - ))) - } - - async fn write(&self, path: &str, bs: Bytes) -> Result<()> { - let (op, relative_path) = self.create_operator(&path)?; - op.write(relative_path, bs) - .await - .map_err(from_opendal_error)?; - Ok(()) - } - - async fn writer(&self, path: &str) -> Result> { - let (op, relative_path) = self.create_operator(&path)?; - Ok(Box::new(OpenDalWriter( - op.writer(relative_path) - .await - .map_err(from_opendal_error)?, - ))) - } - - async fn delete(&self, path: &str) -> Result<()> { - let (op, relative_path) = self.create_operator(&path)?; - Ok(op - .delete(relative_path) - .await - .map_err(from_opendal_error)?) - } - - async fn delete_prefix(&self, path: &str) -> Result<()> { - let (op, relative_path) = self.create_operator(&path)?; - let path = if relative_path.ends_with('/') { - relative_path.to_string() - } else { - format!("{relative_path}/") - }; - Ok(op.remove_all(&path).await.map_err(from_opendal_error)?) - } - - #[allow(unreachable_code, unused_variables)] - fn new_input(&self, path: &str) -> Result { - Ok(InputFile::new(Arc::new(self.clone()), path.to_string())) - } - - #[allow(unreachable_code, unused_variables)] - fn new_output(&self, path: &str) -> Result { - Ok(OutputFile::new(Arc::new(self.clone()), path.to_string())) - } -} - -// Newtype wrappers for opendal types to satisfy orphan rules. -// We can't implement iceberg's FileRead/FileWrite traits directly on opendal's -// Reader/Writer since neither trait nor type is defined in this crate. - -/// Wrapper around `opendal::Reader` that implements `FileRead`. -pub(crate) struct OpenDalReader(pub(crate) opendal::Reader); - -#[async_trait] -impl FileRead for OpenDalReader { - async fn read(&self, range: std::ops::Range) -> Result { - Ok(opendal::Reader::read(&self.0, range) - .await - .map_err(from_opendal_error)? - .to_bytes()) - } -} - -/// Wrapper around `opendal::Writer` that implements `FileWrite`. -pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer); - -#[async_trait] -impl FileWrite for OpenDalWriter { - async fn write(&mut self, bs: Bytes) -> Result<()> { - Ok(opendal::Writer::write(&mut self.0, bs) - .await - .map_err(from_opendal_error)?) - } - - async fn close(&mut self) -> Result<()> { - let _ = opendal::Writer::close(&mut self.0) - .await - .map_err(from_opendal_error)?; - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[cfg(feature = "storage-memory")] - #[test] - fn test_default_memory_operator() { - let op = default_memory_operator(); - assert_eq!(op.info().scheme().to_string(), "memory"); - } -} diff --git a/crates/storage/opendal/src/utils.rs b/crates/storage/opendal/src/utils.rs index b929452e6c..56f8c18059 100644 --- a/crates/storage/opendal/src/utils.rs +++ b/crates/storage/opendal/src/utils.rs @@ -21,6 +21,9 @@ pub(crate) fn is_truthy(value: &str) -> bool { /// Convert an opendal error into an iceberg error. pub(crate) fn from_opendal_error(e: opendal::Error) -> iceberg::Error { - iceberg::Error::new(iceberg::ErrorKind::Unexpected, "Failure in doing io operation") - .with_source(e) + iceberg::Error::new( + iceberg::ErrorKind::Unexpected, + "Failure in doing io operation", + ) + .with_source(e) } From f8fe4bf5db3410099ae8f6e1c918a10a0c6390fc Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 4 Mar 2026 15:30:22 -0800 Subject: [PATCH 4/9] update feature flag name --- bindings/python/Cargo.toml | 2 +- crates/catalog/glue/Cargo.toml | 2 +- crates/catalog/hms/Cargo.toml | 2 +- crates/catalog/s3tables/Cargo.toml | 2 +- crates/examples/Cargo.toml | 2 +- crates/iceberg/src/io/storage/mod.rs | 2 - crates/integration_tests/Cargo.toml | 2 +- crates/storage/opendal/Cargo.toml | 16 +-- crates/storage/opendal/src/azdls.rs | 5 +- crates/storage/opendal/src/lib.rs | 110 +++++++++--------- .../storage/opendal/tests/file_io_gcs_test.rs | 2 +- .../storage/opendal/tests/file_io_s3_test.rs | 2 +- 12 files changed, 74 insertions(+), 75 deletions(-) diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 7d28cb0bb2..bcc4f42681 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -33,7 +33,7 @@ crate-type = ["cdylib"] [dependencies] arrow = { version = "57.1", features = ["pyarrow", "chrono-tz"] } iceberg = { path = "../../crates/iceberg" } -iceberg-storage-opendal = { path = "../../crates/storage/opendal", features = ["storage-s3", "storage-fs", "storage-memory"] } +iceberg-storage-opendal = { path = "../../crates/storage/opendal", features = ["opendal-s3", "opendal-fs", "opendal-memory"] } pyo3 = { version = "0.26", features = ["extension-module", "abi3-py310"] } iceberg-datafusion = { path = "../../crates/integrations/datafusion" } datafusion-ffi = { version = "52.1" } diff --git a/crates/catalog/glue/Cargo.toml b/crates/catalog/glue/Cargo.toml index d4efbf54e3..e41253de36 100644 --- a/crates/catalog/glue/Cargo.toml +++ b/crates/catalog/glue/Cargo.toml @@ -34,7 +34,7 @@ async-trait = { workspace = true } aws-config = { workspace = true } aws-sdk-glue = { workspace = true } iceberg = { workspace = true } -iceberg-storage-opendal = { workspace = true, features = ["storage-s3"] } +iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] } serde_json = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index b5a549ce4d..c657394b82 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -54,7 +54,7 @@ motore-macros = { workspace = true } volo = { workspace = true } [dev-dependencies] -iceberg-storage-opendal = { workspace = true, features = ["storage-s3"] } +iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] } iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } [package.metadata.cargo-machete] diff --git a/crates/catalog/s3tables/Cargo.toml b/crates/catalog/s3tables/Cargo.toml index dbd2ef008a..2fe096fec9 100644 --- a/crates/catalog/s3tables/Cargo.toml +++ b/crates/catalog/s3tables/Cargo.toml @@ -35,7 +35,7 @@ async-trait = { workspace = true } aws-config = { workspace = true } aws-sdk-s3tables = { workspace = true } iceberg = { workspace = true } -iceberg-storage-opendal = { workspace = true, features = ["storage-s3"] } +iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] } [dev-dependencies] diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index 97ec82f318..cfdd78ee95 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -46,4 +46,4 @@ required-features = ["storage-oss"] [features] default = [] -storage-oss = ["iceberg-storage-opendal/storage-oss"] +storage-oss = ["iceberg-storage-opendal/opendal-oss"] diff --git a/crates/iceberg/src/io/storage/mod.rs b/crates/iceberg/src/io/storage/mod.rs index 28ccf12fa9..3c7c555a55 100644 --- a/crates/iceberg/src/io/storage/mod.rs +++ b/crates/iceberg/src/io/storage/mod.rs @@ -65,7 +65,6 @@ use crate::Result; /// } /// // ... implement other methods /// } -/// /// ``` #[async_trait] #[typetag::serde(tag = "type")] @@ -122,7 +121,6 @@ pub trait Storage: Debug + Send + Sync { /// todo!() /// } /// } -/// /// ``` #[typetag::serde(tag = "type")] pub trait StorageFactory: Debug + Send + Sync { diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index 0fb609f754..774ef13fc3 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -30,7 +30,7 @@ arrow-schema = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } -iceberg-storage-opendal = { workspace = true, features = ["storage-s3"] } +iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } parquet = { workspace = true } tokio = { workspace = true } diff --git a/crates/storage/opendal/Cargo.toml b/crates/storage/opendal/Cargo.toml index 4ff80f1863..7ced43fc3b 100644 --- a/crates/storage/opendal/Cargo.toml +++ b/crates/storage/opendal/Cargo.toml @@ -27,15 +27,15 @@ description = "Apache Iceberg OpenDAL storage implementation" keywords = ["iceberg", "opendal", "storage"] [features] -default = ["storage-memory", "storage-fs", "storage-s3"] -storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs", "storage-oss", "storage-azdls"] +default = ["opendal-memory", "opendal-fs", "opendal-s3"] +opendal-all = ["opendal-memory", "opendal-fs", "opendal-s3", "opendal-gcs", "opendal-oss", "opendal-azdls"] -storage-azdls = ["opendal/services-azdls"] -storage-fs = ["opendal/services-fs"] -storage-gcs = ["opendal/services-gcs"] -storage-memory = ["opendal/services-memory"] -storage-oss = ["opendal/services-oss"] -storage-s3 = ["opendal/services-s3", "reqsign"] +opendal-azdls = ["opendal/services-azdls"] +opendal-fs = ["opendal/services-fs"] +opendal-gcs = ["opendal/services-gcs"] +opendal-memory = ["opendal/services-memory"] +opendal-oss = ["opendal/services-oss"] +opendal-s3 = ["opendal/services-s3", "reqsign"] [dependencies] anyhow = { workspace = true } diff --git a/crates/storage/opendal/src/azdls.rs b/crates/storage/opendal/src/azdls.rs index bda07500d0..70caae7c4e 100644 --- a/crates/storage/opendal/src/azdls.rs +++ b/crates/storage/opendal/src/azdls.rs @@ -31,8 +31,9 @@ use url::Url; use crate::utils::from_opendal_error; -/// Local version of ensure_data_valid macro since the iceberg crate's macro -/// references private modules via $crate paths. +/// Local version of `ensure_data_valid` macro since the iceberg crate's macro +/// uses `$crate::error::Error` paths that don't resolve from external crates +/// (the `error` module is private). macro_rules! ensure_data_valid { ($cond:expr, $fmt:literal, $($arg:tt)*) => { if !$cond { diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs index 3aeab768cf..47544633e8 100644 --- a/crates/storage/opendal/src/lib.rs +++ b/crates/storage/opendal/src/lib.rs @@ -24,53 +24,53 @@ mod utils; -#[cfg(feature = "storage-azdls")] +#[cfg(feature = "opendal-azdls")] mod azdls; -#[cfg(feature = "storage-fs")] +#[cfg(feature = "opendal-fs")] mod fs; -#[cfg(feature = "storage-gcs")] +#[cfg(feature = "opendal-gcs")] mod gcs; -#[cfg(feature = "storage-memory")] +#[cfg(feature = "opendal-memory")] mod memory; -#[cfg(feature = "storage-oss")] +#[cfg(feature = "opendal-oss")] mod oss; -#[cfg(feature = "storage-s3")] +#[cfg(feature = "opendal-s3")] mod s3; use std::sync::Arc; use async_trait::async_trait; -#[cfg(feature = "storage-azdls")] +#[cfg(feature = "opendal-azdls")] use azdls::AzureStorageScheme; -#[cfg(feature = "storage-azdls")] +#[cfg(feature = "opendal-azdls")] use azdls::*; use bytes::Bytes; -#[cfg(feature = "storage-fs")] +#[cfg(feature = "opendal-fs")] use fs::*; -#[cfg(feature = "storage-gcs")] +#[cfg(feature = "opendal-gcs")] use gcs::*; use iceberg::io::{ FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, StorageFactory, }; use iceberg::{Error, ErrorKind, Result}; -#[cfg(feature = "storage-memory")] +#[cfg(feature = "opendal-memory")] use memory::*; use opendal::Operator; use opendal::layers::RetryLayer; -#[cfg(feature = "storage-azdls")] +#[cfg(feature = "opendal-azdls")] use opendal::services::AzdlsConfig; -#[cfg(feature = "storage-gcs")] +#[cfg(feature = "opendal-gcs")] use opendal::services::GcsConfig; -#[cfg(feature = "storage-oss")] +#[cfg(feature = "opendal-oss")] use opendal::services::OssConfig; -#[cfg(feature = "storage-s3")] +#[cfg(feature = "opendal-s3")] use opendal::services::S3Config; -#[cfg(feature = "storage-oss")] +#[cfg(feature = "opendal-oss")] use oss::*; -#[cfg(feature = "storage-s3")] +#[cfg(feature = "opendal-s3")] pub use s3::CustomAwsCredentialLoader; -#[cfg(feature = "storage-s3")] +#[cfg(feature = "opendal-s3")] pub use s3::*; use serde::{Deserialize, Serialize}; use utils::from_opendal_error; @@ -82,13 +82,13 @@ use utils::from_opendal_error; #[derive(Clone, Debug, Serialize, Deserialize)] pub enum OpenDalStorageFactory { /// Memory storage factory. - #[cfg(feature = "storage-memory")] + #[cfg(feature = "opendal-memory")] Memory, /// Local filesystem storage factory. - #[cfg(feature = "storage-fs")] + #[cfg(feature = "opendal-fs")] Fs, /// S3 storage factory. - #[cfg(feature = "storage-s3")] + #[cfg(feature = "opendal-s3")] S3 { /// s3 storage could have `s3://` and `s3a://`. /// Storing the scheme string here to return the correct path. @@ -98,13 +98,13 @@ pub enum OpenDalStorageFactory { customized_credential_load: Option, }, /// GCS storage factory. - #[cfg(feature = "storage-gcs")] + #[cfg(feature = "opendal-gcs")] Gcs, /// OSS storage factory. - #[cfg(feature = "storage-oss")] + #[cfg(feature = "opendal-oss")] Oss, /// Azure Data Lake Storage factory. - #[cfg(feature = "storage-azdls")] + #[cfg(feature = "opendal-azdls")] Azdls { /// The configured Azure storage scheme. configured_scheme: AzureStorageScheme, @@ -116,13 +116,13 @@ impl StorageFactory for OpenDalStorageFactory { #[allow(unused_variables)] fn build(&self, config: &StorageConfig) -> Result> { match self { - #[cfg(feature = "storage-memory")] + #[cfg(feature = "opendal-memory")] OpenDalStorageFactory::Memory => { Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?))) } - #[cfg(feature = "storage-fs")] + #[cfg(feature = "opendal-fs")] OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)), - #[cfg(feature = "storage-s3")] + #[cfg(feature = "opendal-s3")] OpenDalStorageFactory::S3 { configured_scheme, customized_credential_load, @@ -131,15 +131,15 @@ impl StorageFactory for OpenDalStorageFactory { config: s3_config_parse(config.props().clone())?.into(), customized_credential_load: customized_credential_load.clone(), })), - #[cfg(feature = "storage-gcs")] + #[cfg(feature = "opendal-gcs")] OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs { config: gcs_config_parse(config.props().clone())?.into(), })), - #[cfg(feature = "storage-oss")] + #[cfg(feature = "opendal-oss")] OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss { config: oss_config_parse(config.props().clone())?.into(), })), - #[cfg(feature = "storage-azdls")] + #[cfg(feature = "opendal-azdls")] OpenDalStorageFactory::Azdls { configured_scheme } => { Ok(Arc::new(OpenDalStorage::Azdls { configured_scheme: configured_scheme.clone(), @@ -147,12 +147,12 @@ impl StorageFactory for OpenDalStorageFactory { })) } #[cfg(all( - not(feature = "storage-memory"), - not(feature = "storage-fs"), - not(feature = "storage-s3"), - not(feature = "storage-gcs"), - not(feature = "storage-oss"), - not(feature = "storage-azdls"), + not(feature = "opendal-memory"), + not(feature = "opendal-fs"), + not(feature = "opendal-s3"), + not(feature = "opendal-gcs"), + not(feature = "opendal-oss"), + not(feature = "opendal-azdls"), ))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -163,7 +163,7 @@ impl StorageFactory for OpenDalStorageFactory { } /// Default memory operator for serde deserialization. -#[cfg(feature = "storage-memory")] +#[cfg(feature = "opendal-memory")] fn default_memory_operator() -> Operator { memory_config_build().expect("Failed to create default memory operator") } @@ -172,13 +172,13 @@ fn default_memory_operator() -> Operator { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum OpenDalStorage { /// Memory storage variant. - #[cfg(feature = "storage-memory")] + #[cfg(feature = "opendal-memory")] Memory(#[serde(skip, default = "self::default_memory_operator")] Operator), /// Local filesystem storage variant. - #[cfg(feature = "storage-fs")] + #[cfg(feature = "opendal-fs")] LocalFs, /// S3 storage variant. - #[cfg(feature = "storage-s3")] + #[cfg(feature = "opendal-s3")] S3 { /// s3 storage could have `s3://` and `s3a://`. /// Storing the scheme string here to return the correct path. @@ -190,13 +190,13 @@ pub enum OpenDalStorage { customized_credential_load: Option, }, /// GCS storage variant. - #[cfg(feature = "storage-gcs")] + #[cfg(feature = "opendal-gcs")] Gcs { /// GCS configuration. config: Arc, }, /// OSS storage variant. - #[cfg(feature = "storage-oss")] + #[cfg(feature = "opendal-oss")] Oss { /// OSS configuration. config: Arc, @@ -205,7 +205,7 @@ pub enum OpenDalStorage { /// Expects paths of the form /// `abfs[s]://@.dfs./` or /// `wasb[s]://@.blob./`. - #[cfg(feature = "storage-azdls")] + #[cfg(feature = "opendal-azdls")] #[allow(private_interfaces)] Azdls { /// The configured Azure storage scheme. @@ -237,7 +237,7 @@ impl OpenDalStorage { ) -> Result<(Operator, &'a str)> { let path = path.as_ref(); let (operator, relative_path): (Operator, &str) = match self { - #[cfg(feature = "storage-memory")] + #[cfg(feature = "opendal-memory")] OpenDalStorage::Memory(op) => { if let Some(stripped) = path.strip_prefix("memory:/") { (op.clone(), stripped) @@ -245,7 +245,7 @@ impl OpenDalStorage { (op.clone(), &path[1..]) } } - #[cfg(feature = "storage-fs")] + #[cfg(feature = "opendal-fs")] OpenDalStorage::LocalFs => { let op = fs_config_build()?; if let Some(stripped) = path.strip_prefix("file:/") { @@ -254,7 +254,7 @@ impl OpenDalStorage { (op, &path[1..]) } } - #[cfg(feature = "storage-s3")] + #[cfg(feature = "opendal-s3")] OpenDalStorage::S3 { configured_scheme, config, @@ -274,7 +274,7 @@ impl OpenDalStorage { )); } } - #[cfg(feature = "storage-gcs")] + #[cfg(feature = "opendal-gcs")] OpenDalStorage::Gcs { config } => { let operator = gcs_config_build(config, path)?; let prefix = format!("gs://{}/", operator.info().name()); @@ -287,7 +287,7 @@ impl OpenDalStorage { )); } } - #[cfg(feature = "storage-oss")] + #[cfg(feature = "opendal-oss")] OpenDalStorage::Oss { config } => { let op = oss_config_build(config, path)?; let prefix = format!("oss://{}/", op.info().name()); @@ -300,17 +300,17 @@ impl OpenDalStorage { )); } } - #[cfg(feature = "storage-azdls")] + #[cfg(feature = "opendal-azdls")] OpenDalStorage::Azdls { configured_scheme, config, } => azdls_create_operator(path, config, configured_scheme)?, #[cfg(all( - not(feature = "storage-s3"), - not(feature = "storage-fs"), - not(feature = "storage-gcs"), - not(feature = "storage-oss"), - not(feature = "storage-azdls"), + not(feature = "opendal-s3"), + not(feature = "opendal-fs"), + not(feature = "opendal-gcs"), + not(feature = "opendal-oss"), + not(feature = "opendal-azdls"), ))] _ => { return Err(Error::new( @@ -440,7 +440,7 @@ impl FileWrite for OpenDalWriter { mod tests { use super::*; - #[cfg(feature = "storage-memory")] + #[cfg(feature = "opendal-memory")] #[test] fn test_default_memory_operator() { let op = default_memory_operator(); diff --git a/crates/storage/opendal/tests/file_io_gcs_test.rs b/crates/storage/opendal/tests/file_io_gcs_test.rs index 158c4413d6..5e04491131 100644 --- a/crates/storage/opendal/tests/file_io_gcs_test.rs +++ b/crates/storage/opendal/tests/file_io_gcs_test.rs @@ -19,7 +19,7 @@ //! //! These tests assume Docker containers are started externally via `make docker-up`. -#[cfg(feature = "storage-gcs")] +#[cfg(feature = "opendal-gcs")] mod tests { use std::collections::HashMap; use std::sync::Arc; diff --git a/crates/storage/opendal/tests/file_io_s3_test.rs b/crates/storage/opendal/tests/file_io_s3_test.rs index a22c96e0a2..5801af0606 100644 --- a/crates/storage/opendal/tests/file_io_s3_test.rs +++ b/crates/storage/opendal/tests/file_io_s3_test.rs @@ -19,7 +19,7 @@ //! //! These tests assume Docker containers are started externally via `make docker-up`. //! Each test uses unique file paths based on module path to avoid conflicts. -#[cfg(feature = "storage-s3")] +#[cfg(feature = "opendal-s3")] mod tests { use std::sync::Arc; From b6f7649139b8952bd40bf56fe33c3a9e73a12f89 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 4 Mar 2026 15:42:39 -0800 Subject: [PATCH 5/9] fix fmt --- bindings/python/src/datafusion_table_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bindings/python/src/datafusion_table_provider.rs b/bindings/python/src/datafusion_table_provider.rs index 33923128fe..7fa9f53dbd 100644 --- a/bindings/python/src/datafusion_table_provider.rs +++ b/bindings/python/src/datafusion_table_provider.rs @@ -23,9 +23,9 @@ use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec; use datafusion_ffi::table_provider::FFI_TableProvider; use iceberg::TableIdent; use iceberg::io::{FileIOBuilder, StorageFactory}; -use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg::table::StaticTable; use iceberg_datafusion::table::IcebergStaticTableProvider; +use iceberg_storage_opendal::OpenDalStorageFactory; use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::prelude::{PyAnyMethods, PyCapsuleMethods, *}; use pyo3::types::{PyAny, PyCapsule}; From 5aa05931580de0936adca3a6064cae8bb2e9fd31 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 4 Mar 2026 16:12:27 -0800 Subject: [PATCH 6/9] fix dep issues --- Cargo.lock | 1 - crates/storage/opendal/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 92bdb6d27a..7020e05060 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3537,7 +3537,6 @@ dependencies = [ "reqsign", "reqwest", "serde", - "serde_derive", "tokio", "typetag", "url", diff --git a/crates/storage/opendal/Cargo.toml b/crates/storage/opendal/Cargo.toml index 7ced43fc3b..7fd36dc06b 100644 --- a/crates/storage/opendal/Cargo.toml +++ b/crates/storage/opendal/Cargo.toml @@ -46,7 +46,6 @@ bytes = { workspace = true } reqsign = { version = "0.16.3", optional = true, default-features = false } reqwest = { workspace = true } serde = { workspace = true } -serde_derive = { workspace = true } typetag = { workspace = true } url = { workspace = true } From 3dc24687acd1777658912965d10184582590a141 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 4 Mar 2026 17:31:52 -0800 Subject: [PATCH 7/9] group use and mod by services --- crates/storage/opendal/src/lib.rs | 59 +++++++++++++++++-------------- 1 file changed, 32 insertions(+), 27 deletions(-) diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs index 47544633e8..b9332cff06 100644 --- a/crates/storage/opendal/src/lib.rs +++ b/crates/storage/opendal/src/lib.rs @@ -24,56 +24,61 @@ mod utils; -#[cfg(feature = "opendal-azdls")] -mod azdls; -#[cfg(feature = "opendal-fs")] -mod fs; -#[cfg(feature = "opendal-gcs")] -mod gcs; -#[cfg(feature = "opendal-memory")] -mod memory; -#[cfg(feature = "opendal-oss")] -mod oss; -#[cfg(feature = "opendal-s3")] -mod s3; - use std::sync::Arc; use async_trait::async_trait; -#[cfg(feature = "opendal-azdls")] -use azdls::AzureStorageScheme; -#[cfg(feature = "opendal-azdls")] -use azdls::*; use bytes::Bytes; -#[cfg(feature = "opendal-fs")] -use fs::*; -#[cfg(feature = "opendal-gcs")] -use gcs::*; use iceberg::io::{ FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, StorageFactory, }; use iceberg::{Error, ErrorKind, Result}; -#[cfg(feature = "opendal-memory")] -use memory::*; use opendal::Operator; use opendal::layers::RetryLayer; +use serde::{Deserialize, Serialize}; +use utils::from_opendal_error; + +#[cfg(feature = "opendal-azdls")] +mod azdls; +#[cfg(feature = "opendal-azdls")] +use azdls::AzureStorageScheme; +#[cfg(feature = "opendal-azdls")] +use azdls::*; #[cfg(feature = "opendal-azdls")] use opendal::services::AzdlsConfig; + +#[cfg(feature = "opendal-fs")] +mod fs; +#[cfg(feature = "opendal-fs")] +use fs::*; + +#[cfg(feature = "opendal-gcs")] +mod gcs; +#[cfg(feature = "opendal-gcs")] +use gcs::*; #[cfg(feature = "opendal-gcs")] use opendal::services::GcsConfig; + +#[cfg(feature = "opendal-memory")] +mod memory; +#[cfg(feature = "opendal-memory")] +use memory::*; + +#[cfg(feature = "opendal-oss")] +mod oss; #[cfg(feature = "opendal-oss")] use opendal::services::OssConfig; -#[cfg(feature = "opendal-s3")] -use opendal::services::S3Config; #[cfg(feature = "opendal-oss")] use oss::*; + +#[cfg(feature = "opendal-s3")] +mod s3; +#[cfg(feature = "opendal-s3")] +use opendal::services::S3Config; #[cfg(feature = "opendal-s3")] pub use s3::CustomAwsCredentialLoader; #[cfg(feature = "opendal-s3")] pub use s3::*; -use serde::{Deserialize, Serialize}; -use utils::from_opendal_error; /// OpenDAL-based storage factory. /// From 2f7b7727b01829e20d0a503ba33aeb1070053056 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 4 Mar 2026 17:33:45 -0800 Subject: [PATCH 8/9] add storage crate to publish workflow --- .github/workflows/publish.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 4504f6e2f0..c1c9046154 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -39,6 +39,7 @@ jobs: # Order here is sensitive, as it will be used to determine the order of publishing package: - "crates/iceberg" + - "crates/storage/opendal" - "crates/catalog/glue" - "crates/catalog/hms" - "crates/catalog/rest" From 5fc7934b6e6fd0d268ece6e03c23b690a7638f0a Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Wed, 4 Mar 2026 17:46:13 -0800 Subject: [PATCH 9/9] add cfg_if --- Cargo.lock | 1 + Cargo.toml | 1 + crates/storage/opendal/Cargo.toml | 1 + crates/storage/opendal/src/lib.rs | 86 +++++++++++++++++-------------- 4 files changed, 49 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7020e05060..9a460707eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3531,6 +3531,7 @@ dependencies = [ "anyhow", "async-trait", "bytes", + "cfg-if", "iceberg", "iceberg_test_utils", "opendal", diff --git a/Cargo.toml b/Cargo.toml index 78a9335ad1..4a9a0d777e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -60,6 +60,7 @@ backon = "1.5.1" base64 = "0.22.1" bimap = "0.6" bytes = "1.11" +cfg-if = "1" chrono = "0.4.41" clap = { version = "4.5.48", features = ["derive", "cargo"] } dashmap = "6" diff --git a/crates/storage/opendal/Cargo.toml b/crates/storage/opendal/Cargo.toml index 7fd36dc06b..e0a3cf8ed6 100644 --- a/crates/storage/opendal/Cargo.toml +++ b/crates/storage/opendal/Cargo.toml @@ -39,6 +39,7 @@ opendal-s3 = ["opendal/services-s3", "reqsign"] [dependencies] anyhow = { workspace = true } +cfg-if = { workspace = true } iceberg = { workspace = true } opendal = { workspace = true } async-trait = { workspace = true } diff --git a/crates/storage/opendal/src/lib.rs b/crates/storage/opendal/src/lib.rs index b9332cff06..1e5043acae 100644 --- a/crates/storage/opendal/src/lib.rs +++ b/crates/storage/opendal/src/lib.rs @@ -28,6 +28,7 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; +use cfg_if::cfg_if; use iceberg::io::{ FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, StorageFactory, @@ -38,47 +39,52 @@ use opendal::layers::RetryLayer; use serde::{Deserialize, Serialize}; use utils::from_opendal_error; -#[cfg(feature = "opendal-azdls")] -mod azdls; -#[cfg(feature = "opendal-azdls")] -use azdls::AzureStorageScheme; -#[cfg(feature = "opendal-azdls")] -use azdls::*; -#[cfg(feature = "opendal-azdls")] -use opendal::services::AzdlsConfig; - -#[cfg(feature = "opendal-fs")] -mod fs; -#[cfg(feature = "opendal-fs")] -use fs::*; - -#[cfg(feature = "opendal-gcs")] -mod gcs; -#[cfg(feature = "opendal-gcs")] -use gcs::*; -#[cfg(feature = "opendal-gcs")] -use opendal::services::GcsConfig; +cfg_if! { + if #[cfg(feature = "opendal-azdls")] { + mod azdls; + use azdls::AzureStorageScheme; + use azdls::*; + use opendal::services::AzdlsConfig; + } +} -#[cfg(feature = "opendal-memory")] -mod memory; -#[cfg(feature = "opendal-memory")] -use memory::*; - -#[cfg(feature = "opendal-oss")] -mod oss; -#[cfg(feature = "opendal-oss")] -use opendal::services::OssConfig; -#[cfg(feature = "opendal-oss")] -use oss::*; - -#[cfg(feature = "opendal-s3")] -mod s3; -#[cfg(feature = "opendal-s3")] -use opendal::services::S3Config; -#[cfg(feature = "opendal-s3")] -pub use s3::CustomAwsCredentialLoader; -#[cfg(feature = "opendal-s3")] -pub use s3::*; +cfg_if! { + if #[cfg(feature = "opendal-fs")] { + mod fs; + use fs::*; + } +} + +cfg_if! { + if #[cfg(feature = "opendal-gcs")] { + mod gcs; + use gcs::*; + use opendal::services::GcsConfig; + } +} + +cfg_if! { + if #[cfg(feature = "opendal-memory")] { + mod memory; + use memory::*; + } +} + +cfg_if! { + if #[cfg(feature = "opendal-oss")] { + mod oss; + use opendal::services::OssConfig; + use oss::*; + } +} + +cfg_if! { + if #[cfg(feature = "opendal-s3")] { + mod s3; + use opendal::services::S3Config; + pub use s3::*; + } +} /// OpenDAL-based storage factory. ///