diff --git a/Cargo.lock b/Cargo.lock index 054de94e68..cdb8341bf7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3611,6 +3611,7 @@ dependencies = [ "enum-ordinalize", "env_logger", "iceberg", + "iceberg-catalog-loader", "iceberg-datafusion", "indicatif", "libtest-mimic", diff --git a/Cargo.toml b/Cargo.toml index e114217b29..1244cee98d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -84,6 +84,7 @@ iceberg-catalog-hms = { version = "0.8.0", path = "./crates/catalog/hms" } iceberg-catalog-rest = { version = "0.8.0", path = "./crates/catalog/rest" } iceberg-catalog-s3tables = { version = "0.8.0", path = "./crates/catalog/s3tables" } iceberg-catalog-sql = { version = "0.8.0", path = "./crates/catalog/sql" } +iceberg-catalog-loader = { version = "0.8.0", path = "./crates/catalog/loader" } iceberg-datafusion = { version = "0.8.0", path = "./crates/integrations/datafusion" } indicatif = "0.18" itertools = "0.13" diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index e118ef86a9..076de1ad8a 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use iceberg::memory::MemoryCatalogBuilder; use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; use iceberg_catalog_glue::GlueCatalogBuilder; use iceberg_catalog_hms::HmsCatalogBuilder; @@ -31,6 +32,7 @@ type CatalogBuilderFactory = fn() -> Box; /// A registry of catalog builders. static CATALOG_REGISTRY: &[(&str, CatalogBuilderFactory)] = &[ + ("memory", || Box::new(MemoryCatalogBuilder::default())), ("rest", || Box::new(RestCatalogBuilder::default())), ("glue", || Box::new(GlueCatalogBuilder::default())), ("s3tables", || Box::new(S3TablesCatalogBuilder::default())), @@ -121,6 +123,23 @@ mod tests { assert!(result.is_err()); } + #[tokio::test] + async fn test_catalog_loader_pattern_memory_catalog() { + use iceberg::memory::MEMORY_CATALOG_WAREHOUSE; + + let catalog = CatalogLoader::from("memory") + .load( + "memory".to_string(), + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + "memory://test".to_string(), + )]), + ) + .await; + + assert!(catalog.is_ok()); + } + #[tokio::test] async fn test_catalog_loader_pattern() { use iceberg_catalog_rest::REST_CATALOG_PROP_URI; diff --git a/crates/sqllogictest/Cargo.toml b/crates/sqllogictest/Cargo.toml index e826ad7ae0..0c3280ee70 100644 --- a/crates/sqllogictest/Cargo.toml +++ b/crates/sqllogictest/Cargo.toml @@ -32,6 +32,7 @@ datafusion-sqllogictest = { workspace = true } enum-ordinalize = { workspace = true } env_logger = { workspace = true } iceberg = { workspace = true } +iceberg-catalog-loader = { workspace = true } iceberg-datafusion = { workspace = true } indicatif = { workspace = true } log = { workspace = true } diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index 7af647be96..d8483426ed 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -22,9 +22,10 @@ use std::sync::Arc; use datafusion::catalog::CatalogProvider; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_sqllogictest::DataFusion; -use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; +use iceberg::memory::MEMORY_CATALOG_WAREHOUSE; use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec}; -use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation}; +use iceberg::{Catalog, NamespaceIdent, TableCreation}; +use iceberg_catalog_loader::CatalogLoader; use iceberg_datafusion::IcebergCatalogProvider; use indicatif::ProgressBar; @@ -75,38 +76,64 @@ impl DataFusionEngine { } async fn create_catalog( - _catalog_config: Option<&DatafusionCatalogConfig>, + catalog_config: Option<&DatafusionCatalogConfig>, ) -> anyhow::Result> { - // TODO: Use catalog_config to load different catalog types via iceberg-catalog-loader - // See: https://github.com/apache/iceberg-rust/issues/1780 - let catalog = MemoryCatalogBuilder::default() - .load( - "memory", - HashMap::from([( - MEMORY_CATALOG_WAREHOUSE.to_string(), - "memory://".to_string(), - )]), - ) - .await?; + let catalog: Arc = match catalog_config { + Some(config) => Self::load_catalog_from_config(config).await?, + None => Self::create_default_memory_catalog().await?, + }; // Create a test namespace for INSERT INTO tests let namespace = NamespaceIdent::new("default".to_string()); - catalog.create_namespace(&namespace, HashMap::new()).await?; + if catalog.create_namespace(&namespace, HashMap::new()).await.is_err() { + // Namespace may already exist, ignore the error + } // Create partitioned test table (unpartitioned tables are now created via SQL) - Self::create_partitioned_table(&catalog, &namespace).await?; - Self::create_binary_table(&catalog, &namespace).await?; + // Ignore errors as tables may already exist + let _ = Self::create_partitioned_table(catalog.as_ref(), &namespace).await; + let _ = Self::create_binary_table(catalog.as_ref(), &namespace).await; Ok(Arc::new( - IcebergCatalogProvider::try_new(Arc::new(catalog)).await?, + IcebergCatalogProvider::try_new(catalog).await?, )) } + async fn load_catalog_from_config( + config: &DatafusionCatalogConfig, + ) -> anyhow::Result> { + let catalog_type = config.catalog_type.as_str(); + let props = config.props.clone(); + + let catalog = CatalogLoader::from(catalog_type) + .load(catalog_type.to_string(), props) + .await + .map_err(|e| anyhow::anyhow!("Failed to load catalog '{}': {}", catalog_type, e))?; + + Ok(catalog) + } + + /// Not sure if we want to keep a default memory catalog when no config is provided. + async fn create_default_memory_catalog() -> anyhow::Result> { + let catalog = CatalogLoader::from("memory") + .load( + "memory".to_string(), + HashMap::from([( + MEMORY_CATALOG_WAREHOUSE.to_string(), + "memory://".to_string(), + )]), + ) + .await + .map_err(|e| anyhow::anyhow!("Failed to create default memory catalog: {}", e))?; + + Ok(catalog) + } + /// Create a partitioned test table with id, category, and value columns /// Partitioned by category using identity transform /// TODO: this can be removed when we support CREATE EXTERNAL TABLE async fn create_partitioned_table( - catalog: &impl Catalog, + catalog: &dyn Catalog, namespace: &NamespaceIdent, ) -> anyhow::Result<()> { let schema = Schema::builder() @@ -140,7 +167,7 @@ impl DataFusionEngine { /// Used for testing binary predicate pushdown /// TODO: this can be removed when we support CREATE TABLE async fn create_binary_table( - catalog: &impl Catalog, + catalog: &dyn Catalog, namespace: &NamespaceIdent, ) -> anyhow::Result<()> { let schema = Schema::builder() diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictest/src/engine/mod.rs index a276671401..8eb6ecfa2c 100644 --- a/crates/sqllogictest/src/engine/mod.rs +++ b/crates/sqllogictest/src/engine/mod.rs @@ -140,4 +140,32 @@ mod tests { let result = load_engine_runner(config).await; assert!(result.is_ok()); } + + #[tokio::test] + async fn test_load_datafusion_with_memory_catalog() { + let config = EngineConfig::Datafusion { + catalog: Some(DatafusionCatalogConfig { + catalog_type: "memory".to_string(), + props: std::collections::HashMap::from([ + ("warehouse".to_string(), "memory://test".to_string()), + ]), + }), + }; + + let result = load_engine_runner(config).await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_load_datafusion_with_unsupported_catalog() { + let config = EngineConfig::Datafusion { + catalog: Some(DatafusionCatalogConfig { + catalog_type: "unsupported_catalog".to_string(), + props: std::collections::HashMap::new(), + }), + }; + + let result = load_engine_runner(config).await; + assert!(result.is_err()); + } } diff --git a/crates/sqllogictest/testdata/schedules/df_with_catalog_config.toml b/crates/sqllogictest/testdata/schedules/df_with_catalog_config.toml new file mode 100644 index 0000000000..861fe2af8d --- /dev/null +++ b/crates/sqllogictest/testdata/schedules/df_with_catalog_config.toml @@ -0,0 +1,30 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + + +[engines.df] +type = "datafusion" + +[engines.df.catalog] +type = "memory" + +[engines.df.catalog.props] +warehouse = "memory://test-warehouse" + +[[steps]] +engine = "df" +slt = "df_test/show_tables.slt"