diff --git a/Cargo.lock b/Cargo.lock index 0e21e9edf3..bc93074a41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5177,6 +5177,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "lance-namespace-datafusion" +version = "2.0.0-beta.8" +dependencies = [ + "arrow", + "arrow-array", + "arrow-schema", + "async-trait", + "dashmap", + "datafusion", + "datafusion-sql", + "lance", + "lance-namespace", + "lance-namespace-impls", + "tempfile", + "tokio", +] + [[package]] name = "lance-namespace-impls" version = "2.0.0-beta.8" diff --git a/Cargo.toml b/Cargo.toml index 373b2fd4b3..ef328806bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "rust/lance-linalg", "rust/lance-namespace", "rust/lance-namespace-impls", + "rust/lance-namespace-datafusion", "rust/lance-table", "rust/lance-test-macros", "rust/lance-testing", diff --git a/rust/lance-namespace-datafusion/Cargo.toml b/rust/lance-namespace-datafusion/Cargo.toml new file mode 100755 index 0000000000..a8d7987f4e --- /dev/null +++ b/rust/lance-namespace-datafusion/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "lance-namespace-datafusion" +description = "Lance namespace integration with Apache DataFusion catalogs and schemas" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +keywords.workspace = true +categories.workspace = true +rust-version.workspace = true + +[dependencies] +async-trait.workspace = true +dashmap = "6" +datafusion.workspace = true +lance.workspace = true +lance-namespace.workspace = true +tokio.workspace = true + +[dev-dependencies] +arrow.workspace = true +arrow-array.workspace = true +arrow-schema.workspace = true +datafusion-sql.workspace = true +lance-namespace-impls.workspace = true +tempfile.workspace = true + +[lints] +workspace = true diff --git a/rust/lance-namespace-datafusion/README.md b/rust/lance-namespace-datafusion/README.md new file mode 100755 index 0000000000..769bdb3f32 --- /dev/null +++ b/rust/lance-namespace-datafusion/README.md @@ -0,0 +1,46 @@ +# Lance Namespace-DataFusion Integration + +This crate provides a bridge between Lance Namespaces and Apache DataFusion, allowing Lance tables to be queried as if they were native DataFusion catalogs, schemas, and tables. + +It exposes a `SessionBuilder` that constructs a DataFusion `SessionContext` with `CatalogProvider` and `SchemaProvider` implementations backed by a `lance_namespace::LanceNamespace` instance. + +## Features + +- **Dynamic Catalogs**: Maps top-level Lance namespaces to DataFusion catalogs. +- **Dynamic Schemas**: Maps child namespaces to DataFusion schemas. +- **Lazy Table Loading**: Tables are loaded on-demand from the namespace when queried. +- **Read-Only**: This integration focuses solely on providing read access (SQL `SELECT`) to Lance datasets. DML operations are not included. + +## Usage + +First, build a `LanceNamespace` (e.g., from a directory), then use the `SessionBuilder` to create a `SessionContext`. + +```rust,ignore +use std::sync::Arc; +use datafusion::prelude::SessionContext; +use lance_namespace_datafusion::SessionBuilder; +use lance_namespace::LanceNamespace; +use lance_namespace_impls::DirectoryNamespaceBuilder; + +async fn run_query() { + // 1. Create a Lance Namespace + let temp_dir = tempfile::tempdir().unwrap(); + let ns: Arc = Arc::new( + DirectoryNamespaceBuilder::new(temp_dir.path().to_string_lossy().to_string()) + .build() + .await + .unwrap(), + ); + + // 2. Build a DataFusion SessionContext + let ctx = SessionBuilder::new() + .with_root(ns.into()) + .build() + .await + .unwrap(); + + // 3. Run a SQL query + let df = ctx.sql("SELECT * FROM my_catalog.my_schema.my_table").await.unwrap(); + df.show().await.unwrap(); +} +``` diff --git a/rust/lance-namespace-datafusion/src/catalog.rs b/rust/lance-namespace-datafusion/src/catalog.rs new file mode 100755 index 0000000000..43464b7459 --- /dev/null +++ b/rust/lance-namespace-datafusion/src/catalog.rs @@ -0,0 +1,142 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! DataFusion catalog providers backed by Lance namespaces. + +use std::any::Any; +use std::collections::HashSet; +use std::sync::Arc; + +use dashmap::DashMap; +use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider}; +use datafusion::error::Result; + +use crate::namespace_level::NamespaceLevel; +use crate::schema::LanceSchemaProvider; +#[allow(unused_imports)] +use crate::SessionBuilder; + +/// A dynamic [`CatalogProviderList`] that maps Lance namespaces to catalogs. +/// +/// The underlying namespace must be a four-level namespace. It is explicitly configured +/// via [`SessionBuilder::with_root`], and each child namespace under this root is +/// automatically registered as a [`LanceCatalogProvider`]. +#[derive(Debug, Clone)] +pub struct LanceCatalogProviderList { + /// Root Lance namespace used to resolve catalogs / schemas / tables. + #[allow(dead_code)] + ns_level: NamespaceLevel, + /// Catalogs that have been loaded from the root namespace. + /// + /// Note: The values in this map may become stale over time, as there is currently + /// no mechanism to automatically refresh or invalidate cached catalog providers. + catalogs: DashMap>, +} + +impl LanceCatalogProviderList { + pub async fn try_new(namespace: NamespaceLevel) -> Result { + let catalogs = DashMap::new(); + for child_namespace in namespace.children().await? { + let catalog_name = child_namespace.name().to_string(); + let catalog_provider = Arc::new(LanceCatalogProvider::try_new(child_namespace).await?); + catalogs.insert(catalog_name, catalog_provider as Arc); + } + + Ok(Self { + ns_level: namespace, + catalogs, + }) + } +} + +impl CatalogProviderList for LanceCatalogProviderList { + fn as_any(&self) -> &dyn Any { + self + } + + /// Adds a new catalog to this catalog list. + /// If a catalog of the same name existed before, it is replaced in the list and returned. + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + self.catalogs.insert(name, catalog) + } + + fn catalog_names(&self) -> Vec { + self.catalogs + .iter() + .map(|entry| entry.key().clone()) + .collect::>() + .into_iter() + .collect() + } + + fn catalog(&self, name: &str) -> Option> { + self.catalogs + .get(name) + .map(|entry| Arc::clone(entry.value())) + } +} + +/// A dynamic [`CatalogProvider`] that exposes the immediate child namespaces +/// of a Lance namespace as database schemas. +/// +/// The underlying namespace must be a three-level namespace. It is either explicitly +/// registered via [`SessionBuilder::add_catalog`], or automatically created as part of +/// the catalog hierarchy when [`SessionBuilder::with_root`] is used. +/// Child namespaces are automatically loaded as [`LanceSchemaProvider`] instances. +#[derive(Debug, Clone)] +pub struct LanceCatalogProvider { + #[allow(dead_code)] + ns_level: NamespaceLevel, + /// Note: The values in this map may become stale over time, as there is currently + /// no mechanism to automatically refresh or invalidate cached schema providers. + schemas: DashMap>, +} + +impl LanceCatalogProvider { + pub async fn try_new(namespace: NamespaceLevel) -> Result { + let schemas = DashMap::new(); + for child_namespace in namespace.children().await? { + let schema_name = child_namespace.name().to_string(); + let schema_provider = Arc::new(LanceSchemaProvider::try_new(child_namespace).await?); + schemas.insert(schema_name, schema_provider as Arc); + } + + Ok(Self { + ns_level: namespace, + schemas, + }) + } +} + +impl CatalogProvider for LanceCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.schemas + .iter() + .map(|entry| entry.key().clone()) + .collect::>() + .into_iter() + .collect() + } + + fn schema(&self, schema_name: &str) -> Option> { + self.schemas + .get(schema_name) + .map(|entry| Arc::clone(entry.value())) + } + + fn register_schema( + &self, + name: &str, + schema: Arc, + ) -> Result>> { + Ok(self.schemas.insert(name.to_string(), schema)) + } +} diff --git a/rust/lance-namespace-datafusion/src/error.rs b/rust/lance-namespace-datafusion/src/error.rs new file mode 100755 index 0000000000..633e67d26d --- /dev/null +++ b/rust/lance-namespace-datafusion/src/error.rs @@ -0,0 +1,10 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use datafusion::error::DataFusionError; +use lance::Error; + +/// Converts a lance error into a datafusion error. +pub fn to_datafusion_error(error: Error) -> DataFusionError { + DataFusionError::External(error.into()) +} diff --git a/rust/lance-namespace-datafusion/src/lib.rs b/rust/lance-namespace-datafusion/src/lib.rs new file mode 100755 index 0000000000..bf4d5e3264 --- /dev/null +++ b/rust/lance-namespace-datafusion/src/lib.rs @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Lance integration between namespaces and Apache DataFusion catalogs. +//! +//! This crate provides adapters to expose Lance namespaces as +//! DataFusion `CatalogProviderList`, `CatalogProvider`, and +//! `SchemaProvider` implementations. It intentionally focuses on +//! read-only catalog and schema mapping. + +pub mod catalog; +pub mod error; +pub mod namespace_level; +pub mod schema; +pub mod session_builder; + +pub use catalog::{LanceCatalogProvider, LanceCatalogProviderList}; +pub use namespace_level::NamespaceLevel; +pub use schema::LanceSchemaProvider; +pub use session_builder::SessionBuilder; diff --git a/rust/lance-namespace-datafusion/src/namespace_level.rs b/rust/lance-namespace-datafusion/src/namespace_level.rs new file mode 100755 index 0000000000..2bcc282ade --- /dev/null +++ b/rust/lance-namespace-datafusion/src/namespace_level.rs @@ -0,0 +1,126 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::Arc; + +use lance::dataset::builder::DatasetBuilder; +use lance::{Dataset, Result}; +use lance_namespace::models::{ListNamespacesRequest, ListTablesRequest}; +use lance_namespace::LanceNamespace; + +const DEFAULT_CATALOG_NAME: &str = "lance"; + +/// Lightweight wrapper around a Lance namespace handle and identifier. +#[derive(Debug, Clone)] +pub struct NamespaceLevel { + root: Arc, + /// Full namespace identifier, e.g. [catalog, schema]. + namespace_id: Option>, +} + +impl From> for NamespaceLevel { + fn from(lance_namespace: Arc) -> Self { + Self::from_root(Arc::clone(&lance_namespace)) + } +} + +impl From<(Arc, String)> for NamespaceLevel { + fn from(lance_namespace: (Arc, String)) -> Self { + Self::from_namespace(Arc::clone(&lance_namespace.0), vec![lance_namespace.1]) + } +} + +impl From<(Arc, Vec)> for NamespaceLevel { + fn from(lance_namespace: (Arc, Vec)) -> Self { + Self::from_namespace(Arc::clone(&lance_namespace.0), lance_namespace.1) + } +} + +impl NamespaceLevel { + /// Construct a namespace rooted at the top-level Lance namespace. + pub fn from_root(root: Arc) -> Self { + Self { + root, + namespace_id: None, + } + } + + /// Construct a namespace for a specific child identifier under the root. + pub fn from_namespace(root: Arc, namespace_id: Vec) -> Self { + Self { + root, + namespace_id: Some(namespace_id), + } + } + + /// Return the full namespace identifier. + pub fn id(&self) -> Vec { + self.namespace_id.clone().unwrap_or_default() + } + + /// Human readable name for this namespace (last component or default). + pub fn name(&self) -> &str { + self.namespace_id + .as_deref() + .and_then(|v| v.last()) + .map_or(DEFAULT_CATALOG_NAME, |relative_name| relative_name.as_str()) + } + + fn child_id(&self, child_name: String) -> Vec { + match &self.namespace_id { + Some(namespace_id) => { + let mut child_namespace = namespace_id.clone(); + child_namespace.push(child_name); + child_namespace + } + None => vec![child_name], + } + } + + /// List direct child namespaces. + pub async fn children(&self) -> Result> { + let root = Arc::clone(&self.root); + let namespace_id = self.namespace_id.clone().unwrap_or_default(); + let request = ListNamespacesRequest { + id: Some(namespace_id.clone()), + page_token: None, + limit: None, + ..Default::default() + }; + + let namespaces = root.list_namespaces(request).await?.namespaces; + + Ok(namespaces + .into_iter() + .map(|relative_ns_id| { + Self::from_namespace(Arc::clone(&self.root), self.child_id(relative_ns_id)) + }) + .collect()) + } + + /// List table names under this namespace. + pub async fn tables(&self) -> Result> { + let root = Arc::clone(&self.root); + let namespace_id = self.namespace_id.clone().unwrap_or_default(); + let request = ListTablesRequest { + id: Some(namespace_id), + page_token: None, + limit: None, + ..Default::default() + }; + + root.list_tables(request).await.map(|resp| resp.tables) + } + + /// Load a Lance dataset for the given table name in this namespace. + pub async fn load_dataset(&self, table_name: &str) -> Result { + DatasetBuilder::from_namespace( + Arc::clone(&self.root), + self.child_id(table_name.to_string()), + false, + ) + .await? + .load() + .await + } +} diff --git a/rust/lance-namespace-datafusion/src/schema.rs b/rust/lance-namespace-datafusion/src/schema.rs new file mode 100755 index 0000000000..64b4251796 --- /dev/null +++ b/rust/lance-namespace-datafusion/src/schema.rs @@ -0,0 +1,91 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! DataFusion `SchemaProvider` backed by a Lance namespace. +//! +//! This provider resolves tables on demand from a Lance [`NamespaceLevel`] +//! and caches `LanceTableProvider` instances per table name. It focuses +//! on read-only access; DML is handled elsewhere. + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use dashmap::DashMap; +use datafusion::catalog::SchemaProvider; +use datafusion::datasource::TableProvider; +use datafusion::error::Result; + +use crate::error::to_datafusion_error; +use crate::namespace_level::NamespaceLevel; +use lance::datafusion::LanceTableProvider; + +/// A dynamic [`SchemaProvider`] backed directly by a [`NamespaceLevel`]. +/// +/// Exposes Lance tables in the namespace as [`LanceTableProvider`] instances, +/// loaded on demand and cached by table name. +#[derive(Debug, Clone)] +pub struct LanceSchemaProvider { + ns_level: NamespaceLevel, + tables: DashMap>, +} + +impl LanceSchemaProvider { + pub async fn try_new(namespace: NamespaceLevel) -> Result { + Ok(Self { + ns_level: namespace, + tables: DashMap::new(), + }) + } + + async fn load_and_cache_table( + &self, + table_name: &str, + ) -> Result>> { + let dataset = self + .ns_level + .load_dataset(table_name) + .await + .map_err(to_datafusion_error)?; + let dataset = Arc::new(dataset); + let table_provider = Arc::new(LanceTableProvider::new(dataset, false, false)); + self.tables + .insert(table_name.to_string(), Arc::clone(&table_provider)); + Ok(Some(table_provider as Arc)) + } +} + +#[async_trait] +impl SchemaProvider for LanceSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.tables + .iter() + .map(|entry| entry.key().clone()) + .collect() + } + + async fn table(&self, table_name: &str) -> Result>> { + if let Some(existing) = self.tables.get(table_name) { + // Reuse cached provider when still fresh; otherwise reload. + let ds = existing.dataset(); + let latest = ds.latest_version_id().await.map_err(to_datafusion_error)?; + let is_stale = latest != ds.version().version; + if is_stale { + self.tables.remove(table_name); + self.load_and_cache_table(table_name).await + } else { + Ok(Some(Arc::clone(existing.value()) as Arc)) + } + } else { + self.load_and_cache_table(table_name).await + } + } + + fn table_exist(&self, name: &str) -> bool { + self.tables.contains_key(name) + } +} diff --git a/rust/lance-namespace-datafusion/src/session_builder.rs b/rust/lance-namespace-datafusion/src/session_builder.rs new file mode 100755 index 0000000000..0ee207e3cb --- /dev/null +++ b/rust/lance-namespace-datafusion/src/session_builder.rs @@ -0,0 +1,85 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Builder for constructing DataFusion `SessionContext` instances +//! backed by Lance namespaces. +//! +//! This API is intentionally minimal and returns the native +//! `SessionContext` without adding an extra wrapper. It wires up +//! dynamic catalog and schema providers so that Lance namespaces can +//! be accessed via `catalog.schema.table` names. + +use std::sync::Arc; + +use datafusion::error::Result; +use datafusion::execution::context::{SessionConfig, SessionContext}; + +use crate::catalog::LanceCatalogProviderList; +use crate::namespace_level::NamespaceLevel; +use crate::LanceCatalogProvider; + +/// Builder for configuring a `SessionContext` with Lance namespaces. +#[derive(Clone, Debug, Default)] +pub struct SessionBuilder { + /// Optional root namespace exposed via a dynamic + /// `LanceCatalogProviderList`. + root: Option, + /// Explicit catalogs to register by name. + catalogs: Vec<(String, NamespaceLevel)>, + /// Optional DataFusion session configuration. + config: Option, +} + +impl SessionBuilder { + /// Create a new builder with no namespaces or configuration. + pub fn new() -> Self { + Self::default() + } + + /// Attach a root `LanceNamespace` that is exposed as a dynamic + /// catalog list via `LanceCatalogProviderList`. + pub fn with_root(mut self, ns: NamespaceLevel) -> Self { + self.root = Some(ns); + self + } + + /// Register an additional catalog backed by the given namespace. + /// + /// The catalog is identified by `name` and can later be combined + /// with schemas via `SessionBuilder::add_schema` using the same + /// namespace. + pub fn add_catalog(mut self, name: &str, ns: NamespaceLevel) -> Self { + self.catalogs.push((name.to_string(), ns)); + self + } + + /// Provide an explicit `SessionConfig` for the underlying + /// `SessionContext`. + pub fn with_config(mut self, config: SessionConfig) -> Self { + self.config = Some(config); + self + } + + /// Build a `SessionContext` with all configured namespaces. + pub async fn build(self) -> Result { + let ctx = if let Some(config) = self.config { + SessionContext::new_with_config(config) + } else { + SessionContext::new() + }; + + if let Some(root) = self.root { + let catalog_list = Arc::new(LanceCatalogProviderList::try_new(root).await?); + ctx.register_catalog_list(catalog_list); + } + + for (catalog_name, namespace) in self.catalogs { + ctx.register_catalog( + catalog_name, + Arc::new(LanceCatalogProvider::try_new(namespace).await?), + ); + } + + Ok(ctx) + } +} diff --git a/rust/lance-namespace-datafusion/tests/sql.rs b/rust/lance-namespace-datafusion/tests/sql.rs new file mode 100755 index 0000000000..3242f5ce23 --- /dev/null +++ b/rust/lance-namespace-datafusion/tests/sql.rs @@ -0,0 +1,380 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::Arc; + +use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator, StringArray}; +use arrow_schema::Schema; +use datafusion::common::record_batch; +use datafusion::error::{DataFusionError, Result as DFResult}; +use datafusion::prelude::SessionContext; +use lance::dataset::{WriteMode, WriteParams}; +use lance::Dataset; +use lance_namespace::models::CreateNamespaceRequest; +use lance_namespace::LanceNamespace; +use lance_namespace_datafusion::{NamespaceLevel, SessionBuilder}; +use lance_namespace_impls::DirectoryNamespaceBuilder; +use tempfile::TempDir; + +struct Context { + #[allow(dead_code)] + root_dir: TempDir, + #[allow(dead_code)] + extra_dir: TempDir, + ctx: SessionContext, +} + +fn col(batch: &RecordBatch, idx: usize) -> &T { + batch.column(idx).as_any().downcast_ref::().unwrap() +} + +fn customers_data() -> (Arc, RecordBatch) { + let batch = record_batch!( + ("customer_id", Int32, vec![1, 2, 3]), + ("name", Utf8, vec!["Alice", "Bob", "Carol"]), + ("city", Utf8, vec!["NY", "SF", "LA"]) + ) + .unwrap(); + let schema = batch.schema(); + + (schema, batch) +} + +fn orders_data() -> (Arc, RecordBatch) { + let batch = record_batch!( + ("order_id", Int32, vec![101, 102, 103]), + ("customer_id", Int32, vec![1, 2, 3]), + ("amount", Int32, vec![100, 200, 300]) + ) + .unwrap(); + let schema = batch.schema(); + + (schema, batch) +} + +fn orders2_data() -> (Arc, RecordBatch) { + let batch = record_batch!( + ("order_id", Int32, vec![201, 202]), + ("customer_id", Int32, vec![1, 2]), + ("amount", Int32, vec![150, 250]) + ) + .unwrap(); + let schema = batch.schema(); + + (schema, batch) +} + +fn customers_dim_data() -> (Arc, RecordBatch) { + let batch = record_batch!( + ("customer_id", Int32, vec![1, 2, 3]), + ("segment", Utf8, vec!["Silver", "Gold", "Platinum"]) + ) + .unwrap(); + let schema = batch.schema(); + + (schema, batch) +} + +async fn write_table( + dir: &TempDir, + file_name: &str, + schema: Arc, + batch: RecordBatch, +) -> DFResult<()> { + let full_path = dir.path().join(file_name); + if let Some(parent) = full_path.parent() { + std::fs::create_dir_all(parent)?; + } + + let uri = full_path.to_str().unwrap().to_string(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + let write_params = WriteParams { + mode: WriteMode::Create, + ..Default::default() + }; + + Dataset::write(reader, &uri, Some(write_params)) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + Ok(()) +} + +async fn setup_test_context() -> DFResult { + let root_dir = TempDir::new()?; + let extra_dir = TempDir::new()?; + + let (customers_schema, customers_batch) = customers_data(); + write_table( + &root_dir, + "retail$sales$customers.lance", + customers_schema, + customers_batch, + ) + .await?; + + let (orders_schema, orders_batch) = orders_data(); + write_table( + &root_dir, + "retail$sales$orders.lance", + orders_schema, + orders_batch, + ) + .await?; + + let (orders2_schema, orders2_batch) = orders2_data(); + write_table( + &root_dir, + "wholesale$sales2$orders2.lance", + orders2_schema, + orders2_batch, + ) + .await?; + + let (dim_schema, dim_batch) = customers_dim_data(); + write_table( + &extra_dir, + "crm$dim$customers_dim.lance", + dim_schema, + dim_batch, + ) + .await?; + + let root_path = root_dir.path().to_string_lossy().to_string(); + let root_dir_ns = DirectoryNamespaceBuilder::new(root_path) + .manifest_enabled(true) + .dir_listing_enabled(true) + .build() + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let extra_path = extra_dir.path().to_string_lossy().to_string(); + let extra_dir_ns = DirectoryNamespaceBuilder::new(extra_path) + .manifest_enabled(true) + .dir_listing_enabled(true) + .build() + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + // Create nested namespaces for retail / wholesale / crm. + let mut create_retail = CreateNamespaceRequest::new(); + create_retail.id = Some(vec!["retail".to_string()]); + root_dir_ns + .create_namespace(create_retail) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let mut create_sales = CreateNamespaceRequest::new(); + create_sales.id = Some(vec!["retail".to_string(), "sales".to_string()]); + root_dir_ns + .create_namespace(create_sales) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let mut create_wholesale = CreateNamespaceRequest::new(); + create_wholesale.id = Some(vec!["wholesale".to_string()]); + root_dir_ns + .create_namespace(create_wholesale) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let mut create_sales2 = CreateNamespaceRequest::new(); + create_sales2.id = Some(vec!["wholesale".to_string(), "sales2".to_string()]); + root_dir_ns + .create_namespace(create_sales2) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let mut create_crm = CreateNamespaceRequest::new(); + create_crm.id = Some(vec!["crm".to_string()]); + extra_dir_ns + .create_namespace(create_crm) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let mut create_dim = CreateNamespaceRequest::new(); + create_dim.id = Some(vec!["crm".to_string(), "dim".to_string()]); + extra_dir_ns + .create_namespace(create_dim) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + root_dir_ns + .migrate() + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + extra_dir_ns + .migrate() + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let root_ns: Arc = Arc::new(root_dir_ns); + let extra_ns: Arc = Arc::new(extra_dir_ns); + + let ctx = SessionBuilder::new() + .with_root(NamespaceLevel::from_root(Arc::clone(&root_ns))) + .add_catalog( + "crm", + NamespaceLevel::from_namespace(Arc::clone(&extra_ns), vec!["crm".to_string()]), + ) + .build() + .await?; + + Ok(Context { + root_dir, + extra_dir, + ctx, + }) +} + +#[tokio::test] +async fn join_within_retail() -> DFResult<()> { + let ns = setup_test_context().await?; + + let df = ns + .ctx + .sql( + "SELECT customers.name, orders.amount \ + FROM retail.sales.customers customers \ + JOIN retail.sales.orders orders \ + ON customers.customer_id = orders.customer_id \ + WHERE customers.customer_id = 2", + ) + .await?; + let batches = df.collect().await?; + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + + let name_col = col::(batch, 0); + let amount_col = col::(batch, 1); + + assert_eq!(name_col.value(0), "Bob"); + assert_eq!(amount_col.value(0), 200); + + Ok(()) +} + +#[tokio::test] +async fn join_across_root_catalogs() -> DFResult<()> { + let ns = setup_test_context().await?; + + let df = ns + .ctx + .sql( + "SELECT c.name, o2.amount \ + FROM retail.sales.customers c \ + JOIN wholesale.sales2.orders2 o2 \ + ON c.customer_id = o2.customer_id \ + WHERE o2.order_id = 202", + ) + .await?; + let batches = df.collect().await?; + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + + let name_col = col::(batch, 0); + let amount_col = col::(batch, 1); + + assert_eq!(name_col.value(0), "Bob"); + assert_eq!(amount_col.value(0), 250); + + Ok(()) +} + +#[tokio::test] +async fn join_across_catalogs() -> DFResult<()> { + let ns = setup_test_context().await?; + + let df = ns + .ctx + .sql( + "SELECT customers.name, dim.segment \ + FROM retail.sales.customers customers \ + JOIN crm.dim.customers_dim dim \ + ON customers.customer_id = dim.customer_id \ + WHERE customers.customer_id = 3", + ) + .await?; + let batches = df.collect().await?; + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + + let name_col = col::(batch, 0); + let segment_col = col::(batch, 1); + + assert_eq!(name_col.value(0), "Carol"); + assert_eq!(segment_col.value(0), "Platinum"); + + Ok(()) +} + +#[tokio::test] +async fn aggregation_city_totals() -> DFResult<()> { + let ns = setup_test_context().await?; + + let df = ns + .ctx + .sql( + "SELECT city, SUM(amount) AS total \ + FROM retail.sales.orders o \ + JOIN retail.sales.customers c \ + ON c.customer_id = o.customer_id \ + GROUP BY city \ + ORDER BY city", + ) + .await?; + let batches = df.collect().await?; + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 3); + + let city_col = col::(batch, 0); + let total_col = col::(batch, 1); + + assert_eq!(city_col.value(0), "LA"); + assert_eq!(total_col.value(0), 300); + + assert_eq!(city_col.value(1), "NY"); + assert_eq!(total_col.value(1), 100); + + assert_eq!(city_col.value(2), "SF"); + assert_eq!(total_col.value(2), 200); + + Ok(()) +} + +#[tokio::test] +async fn cte_view_customer_orders() -> DFResult<()> { + let ns = setup_test_context().await?; + + let df = ns + .ctx + .sql( + "WITH customer_orders AS ( \ + SELECT c.customer_id, c.name, o.order_id, o.amount \ + FROM retail.sales.customers c \ + JOIN retail.sales.orders o \ + ON c.customer_id = o.customer_id \ + ) \ + SELECT order_id, name, amount FROM customer_orders WHERE customer_id = 1", + ) + .await?; + let batches = df.collect().await?; + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + + let order_id_col = col::(batch, 0); + let name_col = col::(batch, 1); + let amount_col = col::(batch, 2); + + assert_eq!(order_id_col.value(0), 101); + assert_eq!(name_col.value(0), "Alice"); + assert_eq!(amount_col.value(0), 100); + + Ok(()) +}