diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 4504f6e2f0..c1c9046154 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -39,6 +39,7 @@ jobs: # Order here is sensitive, as it will be used to determine the order of publishing package: - "crates/iceberg" + - "crates/storage/opendal" - "crates/catalog/glue" - "crates/catalog/hms" - "crates/catalog/rest" diff --git a/Cargo.lock b/Cargo.lock index 8f5df819a2..9a460707eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3299,13 +3299,11 @@ dependencies = [ "moka", "murmur3", "once_cell", - "opendal", "ordered-float 4.6.0", "parquet", "pretty_assertions", "rand 0.8.5", "regex", - "reqsign", "reqwest", "roaring", "serde", @@ -3341,6 +3339,7 @@ dependencies = [ "aws-config", "aws-sdk-glue", "iceberg", + "iceberg-storage-opendal", "iceberg_test_utils", "serde_json", "tokio", @@ -3357,6 +3356,7 @@ dependencies = [ "faststr", "hive_metastore", "iceberg", + "iceberg-storage-opendal", "iceberg_test_utils", "linkedbytes", "metainfo", @@ -3415,6 +3415,7 @@ dependencies = [ "aws-config", "aws-sdk-s3tables", "iceberg", + "iceberg-storage-opendal", "iceberg_test_utils", "itertools 0.13.0", "tokio", @@ -3458,6 +3459,7 @@ dependencies = [ "futures", "iceberg", "iceberg-catalog-rest", + "iceberg-storage-opendal", "tokio", ] @@ -3470,6 +3472,7 @@ dependencies = [ "futures", "iceberg", "iceberg-catalog-rest", + "iceberg-storage-opendal", "iceberg_test_utils", "ordered-float 2.10.1", "parquet", @@ -3521,6 +3524,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "iceberg-storage-opendal" +version = "0.8.0" +dependencies = [ + "anyhow", + "async-trait", + "bytes", + "cfg-if", + "iceberg", + "iceberg_test_utils", + "opendal", + "reqsign", + "reqwest", + "serde", + "tokio", + "typetag", + "url", +] + [[package]] name = "iceberg_test_utils" version = "0.8.0" diff --git a/Cargo.toml b/Cargo.toml index 23b498214a..4a9a0d777e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ members = [ "crates/integration_tests", "crates/integrations/*", "crates/sqllogictest", + "crates/storage/*", "crates/test_utils", ] resolver = "2" @@ -59,6 +60,7 @@ backon = "1.5.1" base64 = "0.22.1" bimap = "0.6" bytes = "1.11" +cfg-if = "1" chrono = "0.4.41" clap = { version = "4.5.48", features = ["derive", "cargo"] } dashmap = "6" @@ -85,6 +87,7 @@ iceberg-catalog-rest = { version = "0.8.0", path = "./crates/catalog/rest" } iceberg-catalog-s3tables = { version = "0.8.0", path = "./crates/catalog/s3tables" } iceberg-catalog-sql = { version = "0.8.0", path = "./crates/catalog/sql" } iceberg-datafusion = { version = "0.8.0", path = "./crates/integrations/datafusion" } +iceberg-storage-opendal = { version = "0.8.0", path = "./crates/storage/opendal" } indicatif = "0.18" itertools = "0.13" libtest-mimic = "0.8.1" diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 9279983f96..bcc4f42681 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -33,6 +33,7 @@ crate-type = ["cdylib"] [dependencies] arrow = { version = "57.1", features = ["pyarrow", "chrono-tz"] } iceberg = { path = "../../crates/iceberg" } +iceberg-storage-opendal = { path = "../../crates/storage/opendal", features = ["opendal-s3", "opendal-fs", "opendal-memory"] } pyo3 = { version = "0.26", features = ["extension-module", "abi3-py310"] } iceberg-datafusion = { path = "../../crates/integrations/datafusion" } datafusion-ffi = { version = "52.1" } diff --git a/bindings/python/src/datafusion_table_provider.rs b/bindings/python/src/datafusion_table_provider.rs index d4a0234ff7..7fa9f53dbd 100644 --- a/bindings/python/src/datafusion_table_provider.rs +++ b/bindings/python/src/datafusion_table_provider.rs @@ -22,9 +22,10 @@ use std::sync::Arc; use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec; use datafusion_ffi::table_provider::FFI_TableProvider; use iceberg::TableIdent; -use iceberg::io::{FileIOBuilder, OpenDalStorageFactory, StorageFactory}; +use iceberg::io::{FileIOBuilder, StorageFactory}; use iceberg::table::StaticTable; use iceberg_datafusion::table::IcebergStaticTableProvider; +use iceberg_storage_opendal::OpenDalStorageFactory; use pyo3::exceptions::{PyRuntimeError, PyValueError}; use pyo3::prelude::{PyAnyMethods, PyCapsuleMethods, *}; use pyo3::types::{PyAny, PyCapsule}; diff --git a/crates/catalog/glue/Cargo.toml b/crates/catalog/glue/Cargo.toml index f42fedeae1..e41253de36 100644 --- a/crates/catalog/glue/Cargo.toml +++ b/crates/catalog/glue/Cargo.toml @@ -34,6 +34,7 @@ async-trait = { workspace = true } aws-config = { workspace = true } aws-sdk-glue = { workspace = true } iceberg = { workspace = true } +iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] } serde_json = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 7894900068..bf2f392330 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -25,8 +25,8 @@ use aws_sdk_glue::operation::create_table::CreateTableError; use aws_sdk_glue::operation::update_table::UpdateTableError; use aws_sdk_glue::types::TableInput; use iceberg::io::{ - FileIO, FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, - S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, StorageFactory, + FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, + S3_SESSION_TOKEN, StorageFactory, }; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; @@ -34,6 +34,7 @@ use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; +use iceberg_storage_opendal::OpenDalStorageFactory; use crate::error::{from_aws_build_error, from_aws_sdk_error}; use crate::utils::{ diff --git a/crates/catalog/glue/tests/glue_catalog_test.rs b/crates/catalog/glue/tests/glue_catalog_test.rs index a209ae09c8..0b7dbe9f23 100644 --- a/crates/catalog/glue/tests/glue_catalog_test.rs +++ b/crates/catalog/glue/tests/glue_catalog_test.rs @@ -23,10 +23,7 @@ use std::collections::HashMap; use std::sync::Arc; -use iceberg::io::{ - FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, - S3_SECRET_ACCESS_KEY, -}; +use iceberg::io::{FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::{ @@ -36,6 +33,7 @@ use iceberg_catalog_glue::{ AWS_ACCESS_KEY_ID, AWS_REGION_NAME, AWS_SECRET_ACCESS_KEY, GLUE_CATALOG_PROP_URI, GLUE_CATALOG_PROP_WAREHOUSE, GlueCatalog, GlueCatalogBuilder, }; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg_test_utils::{ cleanup_namespace, get_glue_endpoint, get_minio_endpoint, normalize_test_name_with_parts, set_up, diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index a6517fb7b0..c657394b82 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -54,6 +54,7 @@ motore-macros = { workspace = true } volo = { workspace = true } [dev-dependencies] +iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] } iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } [package.metadata.cargo-machete] diff --git a/crates/catalog/hms/tests/hms_catalog_test.rs b/crates/catalog/hms/tests/hms_catalog_test.rs index c5d77ac37a..74c9e52e92 100644 --- a/crates/catalog/hms/tests/hms_catalog_test.rs +++ b/crates/catalog/hms/tests/hms_catalog_test.rs @@ -23,16 +23,14 @@ use std::collections::HashMap; use std::sync::Arc; -use iceberg::io::{ - FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, - S3_SECRET_ACCESS_KEY, -}; +use iceberg::io::{FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent, TableCreation, TableIdent}; use iceberg_catalog_hms::{ HMS_CATALOG_PROP_THRIFT_TRANSPORT, HMS_CATALOG_PROP_URI, HMS_CATALOG_PROP_WAREHOUSE, HmsCatalog, HmsCatalogBuilder, THRIFT_TRANSPORT_BUFFERED, }; +use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg_test_utils::{ cleanup_namespace, get_hms_endpoint, get_minio_endpoint, normalize_test_name_with_parts, set_up, }; diff --git a/crates/catalog/s3tables/Cargo.toml b/crates/catalog/s3tables/Cargo.toml index fde08b9a49..2fe096fec9 100644 --- a/crates/catalog/s3tables/Cargo.toml +++ b/crates/catalog/s3tables/Cargo.toml @@ -35,6 +35,7 @@ async-trait = { workspace = true } aws-config = { workspace = true } aws-sdk-s3tables = { workspace = true } iceberg = { workspace = true } +iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] } [dev-dependencies] diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 58100ccbce..afe28ae453 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -26,13 +26,14 @@ use aws_sdk_s3tables::operation::get_table::GetTableOutput; use aws_sdk_s3tables::operation::list_tables::ListTablesOutput; use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError; use aws_sdk_s3tables::types::OpenTableFormat; -use iceberg::io::{FileIO, FileIOBuilder, OpenDalStorageFactory, StorageFactory}; +use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent, }; +use iceberg_storage_opendal::OpenDalStorageFactory; use crate::utils::create_sdk_config; diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml index c7874d9a17..cfdd78ee95 100644 --- a/crates/examples/Cargo.toml +++ b/crates/examples/Cargo.toml @@ -28,6 +28,7 @@ version = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } +iceberg-storage-opendal = { workspace = true, optional = true } tokio = { workspace = true, features = ["full"] } [[example]] @@ -45,4 +46,4 @@ required-features = ["storage-oss"] [features] default = [] -storage-oss = ["iceberg/storage-oss"] +storage-oss = ["iceberg-storage-opendal/opendal-oss"] diff --git a/crates/examples/src/oss_backend.rs b/crates/examples/src/oss_backend.rs index 9835b8dc44..2d56cceb7f 100644 --- a/crates/examples/src/oss_backend.rs +++ b/crates/examples/src/oss_backend.rs @@ -19,9 +19,9 @@ use std::collections::HashMap; use std::sync::Arc; use futures::stream::StreamExt; -use iceberg::io::OpenDalStorageFactory; use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableIdent}; use iceberg_catalog_rest::{REST_CATALOG_PROP_URI, RestCatalogBuilder}; +use iceberg_storage_opendal::OpenDalStorageFactory; // Configure these values according to your environment diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index d6d931c86d..41ee771617 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -29,15 +29,7 @@ license = { workspace = true } repository = { workspace = true } [features] -default = ["storage-memory", "storage-fs", "storage-s3"] -storage-all = ["storage-memory", "storage-fs", "storage-s3", "storage-gcs"] - -storage-azdls = ["opendal/services-azdls"] -storage-fs = ["opendal/services-fs"] -storage-gcs = ["opendal/services-gcs"] -storage-memory = ["opendal/services-memory"] -storage-oss = ["opendal/services-oss"] -storage-s3 = ["opendal/services-s3", "reqsign"] +default = [] [dependencies] @@ -68,11 +60,9 @@ itertools = { workspace = true } moka = { version = "0.12.10", features = ["future"] } murmur3 = { workspace = true } once_cell = { workspace = true } -opendal = { workspace = true } ordered-float = { workspace = true } parquet = { workspace = true, features = ["async"] } rand = { workspace = true } -reqsign = { version = "0.16.3", optional = true, default-features = false } reqwest = { workspace = true } roaring = { workspace = true } fastnum = { workspace = true } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index e5014c3e70..8db9674ad9 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -129,7 +129,8 @@ pub trait CatalogBuilder: Default + Debug + Send + Sync { /// /// ```rust,ignore /// use iceberg::CatalogBuilder; - /// use iceberg::io::{OpenDalStorageFactory, StorageFactory}; + /// use iceberg::io::StorageFactory; + /// use iceberg_storage_opendal::OpenDalStorageFactory; /// use std::sync::Arc; /// /// let catalog = MyCatalogBuilder::default() diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs index 8810ef5009..a0399a8082 100644 --- a/crates/iceberg/src/error.rs +++ b/crates/iceberg/src/error.rs @@ -384,12 +384,6 @@ define_from_err!( "Failure in conversion with avro" ); -define_from_err!( - opendal::Error, - ErrorKind::Unexpected, - "Failure in doing io operation" -); - define_from_err!( url::ParseError, ErrorKind::DataInvalid, diff --git a/crates/iceberg/src/io/storage/mod.rs b/crates/iceberg/src/io/storage/mod.rs index 31ceaf2436..3c7c555a55 100644 --- a/crates/iceberg/src/io/storage/mod.rs +++ b/crates/iceberg/src/io/storage/mod.rs @@ -20,7 +20,6 @@ mod config; mod local_fs; mod memory; -mod opendal; use std::fmt::Debug; use std::sync::Arc; @@ -30,9 +29,6 @@ use bytes::Bytes; pub use config::*; pub use local_fs::{LocalFsStorage, LocalFsStorageFactory}; pub use memory::{MemoryStorage, MemoryStorageFactory}; -#[cfg(feature = "storage-s3")] -pub use opendal::CustomAwsCredentialLoader; -pub use opendal::{OpenDalStorage, OpenDalStorageFactory}; use super::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; use crate::Result; @@ -69,11 +65,6 @@ use crate::Result; /// } /// // ... implement other methods /// } -/// -/// TODO remove below when the trait is integrated with FileIO and Catalog -/// # NOTE -/// This trait is under heavy development and is not used anywhere as of now -/// Please DO NOT implement it /// ``` #[async_trait] #[typetag::serde(tag = "type")] @@ -130,11 +121,6 @@ pub trait Storage: Debug + Send + Sync { /// todo!() /// } /// } -/// -/// TODO remove below when the trait is integrated with FileIO and Catalog -/// # NOTE -/// This trait is under heavy development and is not used anywhere as of now -/// Please DO NOT implement it /// ``` #[typetag::serde(tag = "type")] pub trait StorageFactory: Debug + Send + Sync { diff --git a/crates/integration_tests/Cargo.toml b/crates/integration_tests/Cargo.toml index 2ed211769f..774ef13fc3 100644 --- a/crates/integration_tests/Cargo.toml +++ b/crates/integration_tests/Cargo.toml @@ -30,6 +30,7 @@ arrow-schema = { workspace = true } futures = { workspace = true } iceberg = { workspace = true } iceberg-catalog-rest = { workspace = true } +iceberg-storage-opendal = { workspace = true, features = ["opendal-s3"] } iceberg_test_utils = { path = "../test_utils", features = ["tests"] } parquet = { workspace = true } tokio = { workspace = true } diff --git a/crates/integration_tests/tests/common/mod.rs b/crates/integration_tests/tests/common/mod.rs index 957c8be5cc..e49a57465c 100644 --- a/crates/integration_tests/tests/common/mod.rs +++ b/crates/integration_tests/tests/common/mod.rs @@ -18,11 +18,11 @@ use std::collections::HashMap; use std::sync::Arc; -use iceberg::io::OpenDalStorageFactory; use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; use iceberg::{Catalog, CatalogBuilder, Namespace, NamespaceIdent}; use iceberg_catalog_rest::RestCatalogBuilder; use iceberg_integration_tests::get_test_fixture; +use iceberg_storage_opendal::OpenDalStorageFactory; pub async fn random_ns() -> Namespace { let fixture = get_test_fixture(); diff --git a/crates/integration_tests/tests/conflict_commit_test.rs b/crates/integration_tests/tests/conflict_commit_test.rs index dc3030519f..3b1362b95d 100644 --- a/crates/integration_tests/tests/conflict_commit_test.rs +++ b/crates/integration_tests/tests/conflict_commit_test.rs @@ -24,7 +24,6 @@ use std::sync::Arc; use arrow_array::{ArrayRef, BooleanArray, Int32Array, RecordBatch, StringArray}; use common::{random_ns, test_schema}; use futures::TryStreamExt; -use iceberg::io::OpenDalStorageFactory; use iceberg::transaction::{ApplyTransactionAction, Transaction}; use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder; use iceberg::writer::file_writer::ParquetWriterBuilder; @@ -36,6 +35,7 @@ use iceberg::writer::{IcebergWriter, IcebergWriterBuilder}; use iceberg::{Catalog, CatalogBuilder, TableCreation}; use iceberg_catalog_rest::RestCatalogBuilder; use iceberg_integration_tests::get_test_fixture; +use iceberg_storage_opendal::OpenDalStorageFactory; use parquet::file::properties::WriterProperties; #[tokio::test] diff --git a/crates/integration_tests/tests/read_evolved_schema.rs b/crates/integration_tests/tests/read_evolved_schema.rs index 78d833b3a1..ae25a08987 100644 --- a/crates/integration_tests/tests/read_evolved_schema.rs +++ b/crates/integration_tests/tests/read_evolved_schema.rs @@ -22,11 +22,11 @@ use std::sync::Arc; use arrow_array::{Decimal128Array, Float64Array, Int64Array, StringArray}; use futures::TryStreamExt; use iceberg::expr::Reference; -use iceberg::io::OpenDalStorageFactory; use iceberg::spec::Datum; use iceberg::{Catalog, CatalogBuilder, TableIdent}; use iceberg_catalog_rest::RestCatalogBuilder; use iceberg_integration_tests::get_test_fixture; +use iceberg_storage_opendal::OpenDalStorageFactory; use ordered_float::OrderedFloat; #[tokio::test] diff --git a/crates/integration_tests/tests/read_positional_deletes.rs b/crates/integration_tests/tests/read_positional_deletes.rs index 71ff128d1b..d4c4afeaf3 100644 --- a/crates/integration_tests/tests/read_positional_deletes.rs +++ b/crates/integration_tests/tests/read_positional_deletes.rs @@ -20,10 +20,10 @@ use std::sync::Arc; use futures::TryStreamExt; -use iceberg::io::OpenDalStorageFactory; use iceberg::{Catalog, CatalogBuilder, TableIdent}; use iceberg_catalog_rest::RestCatalogBuilder; use iceberg_integration_tests::get_test_fixture; +use iceberg_storage_opendal::OpenDalStorageFactory; #[tokio::test] async fn test_read_table_with_positional_deletes() { diff --git a/crates/storage/opendal/Cargo.toml b/crates/storage/opendal/Cargo.toml new file mode 100644 index 0000000000..e0a3cf8ed6 --- /dev/null +++ b/crates/storage/opendal/Cargo.toml @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-storage-opendal" +edition = { workspace = true } +version = { workspace = true } +license = { workspace = true } +repository = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg OpenDAL storage implementation" +keywords = ["iceberg", "opendal", "storage"] + +[features] +default = ["opendal-memory", "opendal-fs", "opendal-s3"] +opendal-all = ["opendal-memory", "opendal-fs", "opendal-s3", "opendal-gcs", "opendal-oss", "opendal-azdls"] + +opendal-azdls = ["opendal/services-azdls"] +opendal-fs = ["opendal/services-fs"] +opendal-gcs = ["opendal/services-gcs"] +opendal-memory = ["opendal/services-memory"] +opendal-oss = ["opendal/services-oss"] +opendal-s3 = ["opendal/services-s3", "reqsign"] + +[dependencies] +anyhow = { workspace = true } +cfg-if = { workspace = true } +iceberg = { workspace = true } +opendal = { workspace = true } +async-trait = { workspace = true } +bytes = { workspace = true } +reqsign = { version = "0.16.3", optional = true, default-features = false } +reqwest = { workspace = true } +serde = { workspace = true } +typetag = { workspace = true } +url = { workspace = true } + +[dev-dependencies] +async-trait = { workspace = true } +iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } +reqsign = { version = "0.16.3", default-features = false } +reqwest = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/crates/iceberg/src/io/storage/opendal/azdls.rs b/crates/storage/opendal/src/azdls.rs similarity index 97% rename from crates/iceberg/src/io/storage/opendal/azdls.rs rename to crates/storage/opendal/src/azdls.rs index 759ff301ea..70caae7c4e 100644 --- a/crates/iceberg/src/io/storage/opendal/azdls.rs +++ b/crates/storage/opendal/src/azdls.rs @@ -19,16 +19,28 @@ use std::collections::HashMap; use std::fmt::Display; use std::str::FromStr; +use iceberg::io::{ + ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, ADLS_AUTHORITY_HOST, ADLS_CLIENT_ID, ADLS_CLIENT_SECRET, + ADLS_CONNECTION_STRING, ADLS_SAS_TOKEN, ADLS_TENANT_ID, +}; +use iceberg::{Error, ErrorKind, Result}; use opendal::Configurator; use opendal::services::AzdlsConfig; use serde::{Deserialize, Serialize}; use url::Url; -use crate::io::{ - ADLS_ACCOUNT_KEY, ADLS_ACCOUNT_NAME, ADLS_AUTHORITY_HOST, ADLS_CLIENT_ID, ADLS_CLIENT_SECRET, - ADLS_CONNECTION_STRING, ADLS_SAS_TOKEN, ADLS_TENANT_ID, -}; -use crate::{Error, ErrorKind, Result, ensure_data_valid}; +use crate::utils::from_opendal_error; + +/// Local version of `ensure_data_valid` macro since the iceberg crate's macro +/// uses `$crate::error::Error` paths that don't resolve from external crates +/// (the `error` module is private). +macro_rules! ensure_data_valid { + ($cond:expr, $fmt:literal, $($arg:tt)*) => { + if !$cond { + return Err(Error::new(ErrorKind::DataInvalid, format!($fmt, $($arg)*))); + } + }; +} /// Parses adls.* prefixed configuration properties. pub(crate) fn azdls_config_parse(mut properties: HashMap) -> Result { @@ -201,7 +213,9 @@ fn azdls_config_build(config: &AzdlsConfig, path: &AzureStoragePath) -> Result Result { let mut cfg = FsConfig::default(); cfg.root = Some("/".to_string()); - Ok(Operator::from_config(cfg)?.finish()) + Ok(Operator::from_config(cfg) + .map_err(from_opendal_error)? + .finish()) } diff --git a/crates/iceberg/src/io/storage/opendal/gcs.rs b/crates/storage/opendal/src/gcs.rs similarity index 91% rename from crates/iceberg/src/io/storage/opendal/gcs.rs rename to crates/storage/opendal/src/gcs.rs index 4cb8aa8591..7d4738c902 100644 --- a/crates/iceberg/src/io/storage/opendal/gcs.rs +++ b/crates/storage/opendal/src/gcs.rs @@ -18,15 +18,16 @@ use std::collections::HashMap; +use iceberg::io::{ + GCS_ALLOW_ANONYMOUS, GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, GCS_DISABLE_VM_METADATA, + GCS_NO_AUTH, GCS_SERVICE_PATH, GCS_TOKEN, +}; +use iceberg::{Error, ErrorKind, Result}; use opendal::Operator; use opendal::services::GcsConfig; use url::Url; -use crate::io::{ - GCS_ALLOW_ANONYMOUS, GCS_CREDENTIALS_JSON, GCS_DISABLE_CONFIG_LOAD, GCS_DISABLE_VM_METADATA, - GCS_NO_AUTH, GCS_SERVICE_PATH, GCS_TOKEN, is_truthy, -}; -use crate::{Error, ErrorKind, Result}; +use crate::utils::{from_opendal_error, is_truthy}; /// Parse iceberg properties to [`GcsConfig`]. pub(crate) fn gcs_config_parse(mut m: HashMap) -> Result { @@ -81,5 +82,7 @@ pub(crate) fn gcs_config_build(cfg: &GcsConfig, path: &str) -> Result let mut cfg = cfg.clone(); cfg.bucket = bucket.to_string(); - Ok(Operator::from_config(cfg)?.finish()) + Ok(Operator::from_config(cfg) + .map_err(from_opendal_error)? + .finish()) } diff --git a/crates/iceberg/src/io/storage/opendal/mod.rs b/crates/storage/opendal/src/lib.rs similarity index 70% rename from crates/iceberg/src/io/storage/opendal/mod.rs rename to crates/storage/opendal/src/lib.rs index 52d00687e0..1e5043acae 100644 --- a/crates/iceberg/src/io/storage/opendal/mod.rs +++ b/crates/storage/opendal/src/lib.rs @@ -15,60 +15,76 @@ // specific language governing permissions and limitations // under the License. -//! OpenDAL-based storage implementation. +//! OpenDAL-based storage implementation for Apache Iceberg. +//! +//! This crate provides [`OpenDalStorage`] and [`OpenDalStorageFactory`], +//! which implement the [`Storage`](iceberg::io::Storage) and +//! [`StorageFactory`](iceberg::io::StorageFactory) traits from the `iceberg` crate +//! using [OpenDAL](https://opendal.apache.org/) as the backend. + +mod utils; -use std::ops::Range; use std::sync::Arc; use async_trait::async_trait; -#[cfg(feature = "storage-azdls")] -use azdls::AzureStorageScheme; use bytes::Bytes; +use cfg_if::cfg_if; +use iceberg::io::{ + FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, + StorageFactory, +}; +use iceberg::{Error, ErrorKind, Result}; use opendal::Operator; use opendal::layers::RetryLayer; -#[cfg(feature = "storage-azdls")] -use opendal::services::AzdlsConfig; -#[cfg(feature = "storage-gcs")] -use opendal::services::GcsConfig; -#[cfg(feature = "storage-oss")] -use opendal::services::OssConfig; -#[cfg(feature = "storage-s3")] -use opendal::services::S3Config; -#[cfg(feature = "storage-s3")] -pub use s3::CustomAwsCredentialLoader; use serde::{Deserialize, Serialize}; +use utils::from_opendal_error; -use crate::io::{ - FileMetadata, FileRead, FileWrite, InputFile, OutputFile, Storage, StorageConfig, - StorageFactory, -}; -use crate::{Error, ErrorKind, Result}; - -#[cfg(feature = "storage-azdls")] -mod azdls; -#[cfg(feature = "storage-fs")] -mod fs; -#[cfg(feature = "storage-gcs")] -mod gcs; -#[cfg(feature = "storage-memory")] -mod memory; -#[cfg(feature = "storage-oss")] -mod oss; -#[cfg(feature = "storage-s3")] -mod s3; - -#[cfg(feature = "storage-azdls")] -use azdls::*; -#[cfg(feature = "storage-fs")] -use fs::*; -#[cfg(feature = "storage-gcs")] -use gcs::*; -#[cfg(feature = "storage-memory")] -use memory::*; -#[cfg(feature = "storage-oss")] -use oss::*; -#[cfg(feature = "storage-s3")] -pub use s3::*; +cfg_if! { + if #[cfg(feature = "opendal-azdls")] { + mod azdls; + use azdls::AzureStorageScheme; + use azdls::*; + use opendal::services::AzdlsConfig; + } +} + +cfg_if! { + if #[cfg(feature = "opendal-fs")] { + mod fs; + use fs::*; + } +} + +cfg_if! { + if #[cfg(feature = "opendal-gcs")] { + mod gcs; + use gcs::*; + use opendal::services::GcsConfig; + } +} + +cfg_if! { + if #[cfg(feature = "opendal-memory")] { + mod memory; + use memory::*; + } +} + +cfg_if! { + if #[cfg(feature = "opendal-oss")] { + mod oss; + use opendal::services::OssConfig; + use oss::*; + } +} + +cfg_if! { + if #[cfg(feature = "opendal-s3")] { + mod s3; + use opendal::services::S3Config; + pub use s3::*; + } +} /// OpenDAL-based storage factory. /// @@ -77,29 +93,29 @@ pub use s3::*; #[derive(Clone, Debug, Serialize, Deserialize)] pub enum OpenDalStorageFactory { /// Memory storage factory. - #[cfg(feature = "storage-memory")] + #[cfg(feature = "opendal-memory")] Memory, /// Local filesystem storage factory. - #[cfg(feature = "storage-fs")] + #[cfg(feature = "opendal-fs")] Fs, /// S3 storage factory. - #[cfg(feature = "storage-s3")] + #[cfg(feature = "opendal-s3")] S3 { /// s3 storage could have `s3://` and `s3a://`. /// Storing the scheme string here to return the correct path. configured_scheme: String, /// Custom AWS credential loader. #[serde(skip)] - customized_credential_load: Option, + customized_credential_load: Option, }, /// GCS storage factory. - #[cfg(feature = "storage-gcs")] + #[cfg(feature = "opendal-gcs")] Gcs, /// OSS storage factory. - #[cfg(feature = "storage-oss")] + #[cfg(feature = "opendal-oss")] Oss, /// Azure Data Lake Storage factory. - #[cfg(feature = "storage-azdls")] + #[cfg(feature = "opendal-azdls")] Azdls { /// The configured Azure storage scheme. configured_scheme: AzureStorageScheme, @@ -111,13 +127,13 @@ impl StorageFactory for OpenDalStorageFactory { #[allow(unused_variables)] fn build(&self, config: &StorageConfig) -> Result> { match self { - #[cfg(feature = "storage-memory")] + #[cfg(feature = "opendal-memory")] OpenDalStorageFactory::Memory => { Ok(Arc::new(OpenDalStorage::Memory(memory_config_build()?))) } - #[cfg(feature = "storage-fs")] + #[cfg(feature = "opendal-fs")] OpenDalStorageFactory::Fs => Ok(Arc::new(OpenDalStorage::LocalFs)), - #[cfg(feature = "storage-s3")] + #[cfg(feature = "opendal-s3")] OpenDalStorageFactory::S3 { configured_scheme, customized_credential_load, @@ -126,15 +142,15 @@ impl StorageFactory for OpenDalStorageFactory { config: s3_config_parse(config.props().clone())?.into(), customized_credential_load: customized_credential_load.clone(), })), - #[cfg(feature = "storage-gcs")] + #[cfg(feature = "opendal-gcs")] OpenDalStorageFactory::Gcs => Ok(Arc::new(OpenDalStorage::Gcs { config: gcs_config_parse(config.props().clone())?.into(), })), - #[cfg(feature = "storage-oss")] + #[cfg(feature = "opendal-oss")] OpenDalStorageFactory::Oss => Ok(Arc::new(OpenDalStorage::Oss { config: oss_config_parse(config.props().clone())?.into(), })), - #[cfg(feature = "storage-azdls")] + #[cfg(feature = "opendal-azdls")] OpenDalStorageFactory::Azdls { configured_scheme } => { Ok(Arc::new(OpenDalStorage::Azdls { configured_scheme: configured_scheme.clone(), @@ -142,12 +158,12 @@ impl StorageFactory for OpenDalStorageFactory { })) } #[cfg(all( - not(feature = "storage-memory"), - not(feature = "storage-fs"), - not(feature = "storage-s3"), - not(feature = "storage-gcs"), - not(feature = "storage-oss"), - not(feature = "storage-azdls"), + not(feature = "opendal-memory"), + not(feature = "opendal-fs"), + not(feature = "opendal-s3"), + not(feature = "opendal-gcs"), + not(feature = "opendal-oss"), + not(feature = "opendal-azdls"), ))] _ => Err(Error::new( ErrorKind::FeatureUnsupported, @@ -158,7 +174,7 @@ impl StorageFactory for OpenDalStorageFactory { } /// Default memory operator for serde deserialization. -#[cfg(feature = "storage-memory")] +#[cfg(feature = "opendal-memory")] fn default_memory_operator() -> Operator { memory_config_build().expect("Failed to create default memory operator") } @@ -167,13 +183,13 @@ fn default_memory_operator() -> Operator { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum OpenDalStorage { /// Memory storage variant. - #[cfg(feature = "storage-memory")] + #[cfg(feature = "opendal-memory")] Memory(#[serde(skip, default = "self::default_memory_operator")] Operator), /// Local filesystem storage variant. - #[cfg(feature = "storage-fs")] + #[cfg(feature = "opendal-fs")] LocalFs, /// S3 storage variant. - #[cfg(feature = "storage-s3")] + #[cfg(feature = "opendal-s3")] S3 { /// s3 storage could have `s3://` and `s3a://`. /// Storing the scheme string here to return the correct path. @@ -182,16 +198,16 @@ pub enum OpenDalStorage { config: Arc, /// Custom AWS credential loader. #[serde(skip)] - customized_credential_load: Option, + customized_credential_load: Option, }, /// GCS storage variant. - #[cfg(feature = "storage-gcs")] + #[cfg(feature = "opendal-gcs")] Gcs { /// GCS configuration. config: Arc, }, /// OSS storage variant. - #[cfg(feature = "storage-oss")] + #[cfg(feature = "opendal-oss")] Oss { /// OSS configuration. config: Arc, @@ -200,7 +216,7 @@ pub enum OpenDalStorage { /// Expects paths of the form /// `abfs[s]://@.dfs./` or /// `wasb[s]://@.blob./`. - #[cfg(feature = "storage-azdls")] + #[cfg(feature = "opendal-azdls")] #[allow(private_interfaces)] Azdls { /// The configured Azure storage scheme. @@ -217,7 +233,7 @@ impl OpenDalStorage { /// /// # Arguments /// - /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. + /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`](iceberg::io::FileIO). /// /// # Returns /// @@ -232,7 +248,7 @@ impl OpenDalStorage { ) -> Result<(Operator, &'a str)> { let path = path.as_ref(); let (operator, relative_path): (Operator, &str) = match self { - #[cfg(feature = "storage-memory")] + #[cfg(feature = "opendal-memory")] OpenDalStorage::Memory(op) => { if let Some(stripped) = path.strip_prefix("memory:/") { (op.clone(), stripped) @@ -240,7 +256,7 @@ impl OpenDalStorage { (op.clone(), &path[1..]) } } - #[cfg(feature = "storage-fs")] + #[cfg(feature = "opendal-fs")] OpenDalStorage::LocalFs => { let op = fs_config_build()?; if let Some(stripped) = path.strip_prefix("file:/") { @@ -249,7 +265,7 @@ impl OpenDalStorage { (op, &path[1..]) } } - #[cfg(feature = "storage-s3")] + #[cfg(feature = "opendal-s3")] OpenDalStorage::S3 { configured_scheme, config, @@ -269,7 +285,7 @@ impl OpenDalStorage { )); } } - #[cfg(feature = "storage-gcs")] + #[cfg(feature = "opendal-gcs")] OpenDalStorage::Gcs { config } => { let operator = gcs_config_build(config, path)?; let prefix = format!("gs://{}/", operator.info().name()); @@ -282,7 +298,7 @@ impl OpenDalStorage { )); } } - #[cfg(feature = "storage-oss")] + #[cfg(feature = "opendal-oss")] OpenDalStorage::Oss { config } => { let op = oss_config_build(config, path)?; let prefix = format!("oss://{}/", op.info().name()); @@ -295,17 +311,17 @@ impl OpenDalStorage { )); } } - #[cfg(feature = "storage-azdls")] + #[cfg(feature = "opendal-azdls")] OpenDalStorage::Azdls { configured_scheme, config, } => azdls_create_operator(path, config, configured_scheme)?, #[cfg(all( - not(feature = "storage-s3"), - not(feature = "storage-fs"), - not(feature = "storage-gcs"), - not(feature = "storage-oss"), - not(feature = "storage-azdls"), + not(feature = "opendal-s3"), + not(feature = "opendal-fs"), + not(feature = "opendal-gcs"), + not(feature = "opendal-oss"), + not(feature = "opendal-azdls"), ))] _ => { return Err(Error::new( @@ -327,12 +343,12 @@ impl OpenDalStorage { impl Storage for OpenDalStorage { async fn exists(&self, path: &str) -> Result { let (op, relative_path) = self.create_operator(&path)?; - Ok(op.exists(relative_path).await?) + Ok(op.exists(relative_path).await.map_err(from_opendal_error)?) } async fn metadata(&self, path: &str) -> Result { let (op, relative_path) = self.create_operator(&path)?; - let meta = op.stat(relative_path).await?; + let meta = op.stat(relative_path).await.map_err(from_opendal_error)?; Ok(FileMetadata { size: meta.content_length(), }) @@ -340,28 +356,38 @@ impl Storage for OpenDalStorage { async fn read(&self, path: &str) -> Result { let (op, relative_path) = self.create_operator(&path)?; - Ok(op.read(relative_path).await?.to_bytes()) + Ok(op + .read(relative_path) + .await + .map_err(from_opendal_error)? + .to_bytes()) } async fn reader(&self, path: &str) -> Result> { let (op, relative_path) = self.create_operator(&path)?; - Ok(Box::new(op.reader(relative_path).await?)) + Ok(Box::new(OpenDalReader( + op.reader(relative_path).await.map_err(from_opendal_error)?, + ))) } async fn write(&self, path: &str, bs: Bytes) -> Result<()> { let (op, relative_path) = self.create_operator(&path)?; - op.write(relative_path, bs).await?; + op.write(relative_path, bs) + .await + .map_err(from_opendal_error)?; Ok(()) } async fn writer(&self, path: &str) -> Result> { let (op, relative_path) = self.create_operator(&path)?; - Ok(Box::new(op.writer(relative_path).await?)) + Ok(Box::new(OpenDalWriter( + op.writer(relative_path).await.map_err(from_opendal_error)?, + ))) } async fn delete(&self, path: &str) -> Result<()> { let (op, relative_path) = self.create_operator(&path)?; - Ok(op.delete(relative_path).await?) + Ok(op.delete(relative_path).await.map_err(from_opendal_error)?) } async fn delete_prefix(&self, path: &str) -> Result<()> { @@ -371,7 +397,7 @@ impl Storage for OpenDalStorage { } else { format!("{relative_path}/") }; - Ok(op.remove_all(&path).await?) + Ok(op.remove_all(&path).await.map_err(from_opendal_error)?) } #[allow(unreachable_code, unused_variables)] @@ -385,23 +411,38 @@ impl Storage for OpenDalStorage { } } -// OpenDAL implementations for FileRead and FileWrite traits +// Newtype wrappers for opendal types to satisfy orphan rules. +// We can't implement iceberg's FileRead/FileWrite traits directly on opendal's +// Reader/Writer since neither trait nor type is defined in this crate. + +/// Wrapper around `opendal::Reader` that implements `FileRead`. +pub(crate) struct OpenDalReader(pub(crate) opendal::Reader); #[async_trait] -impl FileRead for opendal::Reader { - async fn read(&self, range: Range) -> Result { - Ok(opendal::Reader::read(self, range).await?.to_bytes()) +impl FileRead for OpenDalReader { + async fn read(&self, range: std::ops::Range) -> Result { + Ok(opendal::Reader::read(&self.0, range) + .await + .map_err(from_opendal_error)? + .to_bytes()) } } +/// Wrapper around `opendal::Writer` that implements `FileWrite`. +pub(crate) struct OpenDalWriter(pub(crate) opendal::Writer); + #[async_trait] -impl FileWrite for opendal::Writer { +impl FileWrite for OpenDalWriter { async fn write(&mut self, bs: Bytes) -> Result<()> { - Ok(opendal::Writer::write(self, bs).await?) + Ok(opendal::Writer::write(&mut self.0, bs) + .await + .map_err(from_opendal_error)?) } async fn close(&mut self) -> Result<()> { - let _ = opendal::Writer::close(self).await?; + let _ = opendal::Writer::close(&mut self.0) + .await + .map_err(from_opendal_error)?; Ok(()) } } @@ -410,7 +451,7 @@ impl FileWrite for opendal::Writer { mod tests { use super::*; - #[cfg(feature = "storage-memory")] + #[cfg(feature = "opendal-memory")] #[test] fn test_default_memory_operator() { let op = default_memory_operator(); diff --git a/crates/iceberg/src/io/storage/opendal/memory.rs b/crates/storage/opendal/src/memory.rs similarity index 84% rename from crates/iceberg/src/io/storage/opendal/memory.rs rename to crates/storage/opendal/src/memory.rs index b8023717b6..5957d0e848 100644 --- a/crates/iceberg/src/io/storage/opendal/memory.rs +++ b/crates/storage/opendal/src/memory.rs @@ -15,11 +15,14 @@ // specific language governing permissions and limitations // under the License. +use iceberg::Result; use opendal::Operator; use opendal::services::MemoryConfig; -use crate::Result; +use crate::utils::from_opendal_error; pub(crate) fn memory_config_build() -> Result { - Ok(Operator::from_config(MemoryConfig::default())?.finish()) + Ok(Operator::from_config(MemoryConfig::default()) + .map_err(from_opendal_error)? + .finish()) } diff --git a/crates/iceberg/src/io/storage/opendal/oss.rs b/crates/storage/opendal/src/oss.rs similarity index 89% rename from crates/iceberg/src/io/storage/opendal/oss.rs rename to crates/storage/opendal/src/oss.rs index 98b68f7cde..add8b7a0f7 100644 --- a/crates/iceberg/src/io/storage/opendal/oss.rs +++ b/crates/storage/opendal/src/oss.rs @@ -17,12 +17,13 @@ use std::collections::HashMap; +use iceberg::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; +use iceberg::{Error, ErrorKind, Result}; use opendal::services::OssConfig; use opendal::{Configurator, Operator}; use url::Url; -use crate::io::{OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET, OSS_ENDPOINT}; -use crate::{Error, ErrorKind, Result}; +use crate::utils::from_opendal_error; /// Parse iceberg props to oss config. pub(crate) fn oss_config_parse(mut m: HashMap) -> Result { @@ -52,5 +53,5 @@ pub(crate) fn oss_config_build(cfg: &OssConfig, path: &str) -> Result let builder = cfg.clone().into_builder().bucket(bucket); - Ok(Operator::new(builder)?.finish()) + Ok(Operator::new(builder).map_err(from_opendal_error)?.finish()) } diff --git a/crates/iceberg/src/io/storage/opendal/s3.rs b/crates/storage/opendal/src/s3.rs similarity index 96% rename from crates/iceberg/src/io/storage/opendal/s3.rs rename to crates/storage/opendal/src/s3.rs index 6b4d64e3a6..7db88d273f 100644 --- a/crates/iceberg/src/io/storage/opendal/s3.rs +++ b/crates/storage/opendal/src/s3.rs @@ -19,19 +19,20 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use iceberg::io::{ + CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN, + S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, S3_DISABLE_CONFIG_LOAD, + S3_DISABLE_EC2_METADATA, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, S3_SECRET_ACCESS_KEY, + S3_SESSION_TOKEN, S3_SSE_KEY, S3_SSE_MD5, S3_SSE_TYPE, +}; +use iceberg::{Error, ErrorKind, Result}; use opendal::services::S3Config; use opendal::{Configurator, Operator}; pub use reqsign::{AwsCredential, AwsCredentialLoad}; use reqwest::Client; use url::Url; -use crate::io::{ - CLIENT_REGION, S3_ACCESS_KEY_ID, S3_ALLOW_ANONYMOUS, S3_ASSUME_ROLE_ARN, - S3_ASSUME_ROLE_EXTERNAL_ID, S3_ASSUME_ROLE_SESSION_NAME, S3_DISABLE_CONFIG_LOAD, - S3_DISABLE_EC2_METADATA, S3_ENDPOINT, S3_PATH_STYLE_ACCESS, S3_REGION, S3_SECRET_ACCESS_KEY, - S3_SESSION_TOKEN, S3_SSE_KEY, S3_SSE_MD5, S3_SSE_TYPE, is_truthy, -}; -use crate::{Error, ErrorKind, Result}; +use crate::utils::{from_opendal_error, is_truthy}; /// Parse iceberg props to s3 config. pub(crate) fn s3_config_parse(mut m: HashMap) -> Result { @@ -141,7 +142,7 @@ pub(crate) fn s3_config_build( .customized_credential_load(customized_credential_load.clone().into_opendal_loader()); } - Ok(Operator::new(builder)?.finish()) + Ok(Operator::new(builder).map_err(from_opendal_error)?.finish()) } /// Custom AWS credential loader. diff --git a/crates/storage/opendal/src/utils.rs b/crates/storage/opendal/src/utils.rs new file mode 100644 index 0000000000..56f8c18059 --- /dev/null +++ b/crates/storage/opendal/src/utils.rs @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub(crate) fn is_truthy(value: &str) -> bool { + ["true", "t", "1", "on"].contains(&value.to_lowercase().as_str()) +} + +/// Convert an opendal error into an iceberg error. +pub(crate) fn from_opendal_error(e: opendal::Error) -> iceberg::Error { + iceberg::Error::new( + iceberg::ErrorKind::Unexpected, + "Failure in doing io operation", + ) + .with_source(e) +} diff --git a/crates/iceberg/tests/file_io_gcs_test.rs b/crates/storage/opendal/tests/file_io_gcs_test.rs similarity index 95% rename from crates/iceberg/tests/file_io_gcs_test.rs rename to crates/storage/opendal/tests/file_io_gcs_test.rs index 75bc9fae12..5e04491131 100644 --- a/crates/iceberg/tests/file_io_gcs_test.rs +++ b/crates/storage/opendal/tests/file_io_gcs_test.rs @@ -19,15 +19,14 @@ //! //! These tests assume Docker containers are started externally via `make docker-up`. -#[cfg(all(test, feature = "storage-gcs"))] +#[cfg(feature = "opendal-gcs")] mod tests { use std::collections::HashMap; use std::sync::Arc; use bytes::Bytes; - use iceberg::io::{ - FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH, OpenDalStorageFactory, - }; + use iceberg::io::{FileIO, FileIOBuilder, GCS_NO_AUTH, GCS_SERVICE_PATH}; + use iceberg_storage_opendal::OpenDalStorageFactory; use iceberg_test_utils::{get_gcs_endpoint, set_up}; static FAKE_GCS_BUCKET: &str = "test-bucket"; diff --git a/crates/iceberg/tests/file_io_s3_test.rs b/crates/storage/opendal/tests/file_io_s3_test.rs similarity index 97% rename from crates/iceberg/tests/file_io_s3_test.rs rename to crates/storage/opendal/tests/file_io_s3_test.rs index 1c029cdbc3..5801af0606 100644 --- a/crates/iceberg/tests/file_io_s3_test.rs +++ b/crates/storage/opendal/tests/file_io_s3_test.rs @@ -19,15 +19,15 @@ //! //! These tests assume Docker containers are started externally via `make docker-up`. //! Each test uses unique file paths based on module path to avoid conflicts. -#[cfg(all(test, feature = "storage-s3"))] +#[cfg(feature = "opendal-s3")] mod tests { use std::sync::Arc; use async_trait::async_trait; use iceberg::io::{ - CustomAwsCredentialLoader, FileIO, FileIOBuilder, OpenDalStorageFactory, S3_ACCESS_KEY_ID, - S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, + FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, }; + use iceberg_storage_opendal::{CustomAwsCredentialLoader, OpenDalStorageFactory}; use iceberg_test_utils::{get_minio_endpoint, normalize_test_name_with_parts, set_up}; use reqsign::{AwsCredential, AwsCredentialLoad}; use reqwest::Client;