From 3e79cb1255f011c3d251fd29563393310e5221b6 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Sun, 11 Jan 2026 22:50:04 +0800 Subject: [PATCH 1/7] init --- Cargo.lock | 18 + Cargo.toml | 1 + rust/lance-namespace-datafusion/Cargo.toml | 30 ++ rust/lance-namespace-datafusion/README.md | 46 ++ .../lance-namespace-datafusion/src/catalog.rs | 127 ++++++ rust/lance-namespace-datafusion/src/error.rs | 16 + rust/lance-namespace-datafusion/src/lib.rs | 20 + .../src/namespace.rs | 126 ++++++ rust/lance-namespace-datafusion/src/schema.rs | 92 ++++ .../src/session_builder.rs | 85 ++++ rust/lance-namespace-datafusion/tests/sql.rs | 418 ++++++++++++++++++ 11 files changed, 979 insertions(+) create mode 100755 rust/lance-namespace-datafusion/Cargo.toml create mode 100755 rust/lance-namespace-datafusion/README.md create mode 100755 rust/lance-namespace-datafusion/src/catalog.rs create mode 100755 rust/lance-namespace-datafusion/src/error.rs create mode 100755 rust/lance-namespace-datafusion/src/lib.rs create mode 100755 rust/lance-namespace-datafusion/src/namespace.rs create mode 100755 rust/lance-namespace-datafusion/src/schema.rs create mode 100755 rust/lance-namespace-datafusion/src/session_builder.rs create mode 100755 rust/lance-namespace-datafusion/tests/sql.rs diff --git a/Cargo.lock b/Cargo.lock index 28e0069601..f2a1615be1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5158,6 +5158,24 @@ dependencies = [ "tokio", ] +[[package]] +name = "lance-namespace-datafusion" +version = "2.0.0-beta.4" +dependencies = [ + "arrow", + "arrow-array", + "arrow-schema", + "async-trait", + "dashmap", + "datafusion", + "datafusion-sql", + "lance", + "lance-namespace", + "lance-namespace-impls", + "tempfile", + "tokio", +] + [[package]] name = "lance-namespace-impls" version = "2.0.0-beta.4" diff --git a/Cargo.toml b/Cargo.toml index cd13bfc816..ef6ea3785a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,7 @@ members = [ "rust/lance-linalg", "rust/lance-namespace", "rust/lance-namespace-impls", + "rust/lance-namespace-datafusion", "rust/lance-table", "rust/lance-test-macros", "rust/lance-testing", diff --git a/rust/lance-namespace-datafusion/Cargo.toml b/rust/lance-namespace-datafusion/Cargo.toml new file mode 100755 index 0000000000..a8d7987f4e --- /dev/null +++ b/rust/lance-namespace-datafusion/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "lance-namespace-datafusion" +description = "Lance namespace integration with Apache DataFusion catalogs and schemas" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +keywords.workspace = true +categories.workspace = true +rust-version.workspace = true + +[dependencies] +async-trait.workspace = true +dashmap = "6" +datafusion.workspace = true +lance.workspace = true +lance-namespace.workspace = true +tokio.workspace = true + +[dev-dependencies] +arrow.workspace = true +arrow-array.workspace = true +arrow-schema.workspace = true +datafusion-sql.workspace = true +lance-namespace-impls.workspace = true +tempfile.workspace = true + +[lints] +workspace = true diff --git a/rust/lance-namespace-datafusion/README.md b/rust/lance-namespace-datafusion/README.md new file mode 100755 index 0000000000..769bdb3f32 --- /dev/null +++ b/rust/lance-namespace-datafusion/README.md @@ -0,0 +1,46 @@ +# Lance Namespace-DataFusion Integration + +This crate provides a bridge between Lance Namespaces and Apache DataFusion, allowing Lance tables to be queried as if they were native DataFusion catalogs, schemas, and tables. + +It exposes a `SessionBuilder` that constructs a DataFusion `SessionContext` with `CatalogProvider` and `SchemaProvider` implementations backed by a `lance_namespace::LanceNamespace` instance. + +## Features + +- **Dynamic Catalogs**: Maps top-level Lance namespaces to DataFusion catalogs. +- **Dynamic Schemas**: Maps child namespaces to DataFusion schemas. +- **Lazy Table Loading**: Tables are loaded on-demand from the namespace when queried. +- **Read-Only**: This integration focuses solely on providing read access (SQL `SELECT`) to Lance datasets. DML operations are not included. + +## Usage + +First, build a `LanceNamespace` (e.g., from a directory), then use the `SessionBuilder` to create a `SessionContext`. + +```rust,ignore +use std::sync::Arc; +use datafusion::prelude::SessionContext; +use lance_namespace_datafusion::SessionBuilder; +use lance_namespace::LanceNamespace; +use lance_namespace_impls::DirectoryNamespaceBuilder; + +async fn run_query() { + // 1. Create a Lance Namespace + let temp_dir = tempfile::tempdir().unwrap(); + let ns: Arc = Arc::new( + DirectoryNamespaceBuilder::new(temp_dir.path().to_string_lossy().to_string()) + .build() + .await + .unwrap(), + ); + + // 2. Build a DataFusion SessionContext + let ctx = SessionBuilder::new() + .with_root(ns.into()) + .build() + .await + .unwrap(); + + // 3. Run a SQL query + let df = ctx.sql("SELECT * FROM my_catalog.my_schema.my_table").await.unwrap(); + df.show().await.unwrap(); +} +``` diff --git a/rust/lance-namespace-datafusion/src/catalog.rs b/rust/lance-namespace-datafusion/src/catalog.rs new file mode 100755 index 0000000000..9ccc9cde73 --- /dev/null +++ b/rust/lance-namespace-datafusion/src/catalog.rs @@ -0,0 +1,127 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! DataFusion catalog providers backed by Lance namespaces. + +use std::any::Any; +use std::collections::HashSet; +use std::sync::Arc; + +use dashmap::DashMap; +use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider}; +use datafusion::error::Result; + +use crate::namespace::Namespace; +use crate::schema::LanceSchemaProvider; + +/// A dynamic [`CatalogProviderList`] that maps Lance namespaces to catalogs. +/// +/// Top-level namespaces (e.g. `ListNamespacesRequest { id: Some(vec![]) }`) +/// are exposed as catalog names. Child namespaces under a given catalog are +/// exposed as schemas via [`LanceCatalogProvider`]. +#[derive(Debug, Clone)] +pub struct LanceCatalogProviderList { + /// Root Lance namespace used to resolve catalogs / schemas / tables. + #[allow(dead_code)] + namespace: Namespace, + /// Catalogs that have been loaded from the root namespace. + catalogs: DashMap>, +} + +impl LanceCatalogProviderList { + pub async fn try_new(namespace: Namespace) -> Result { + let catalogs = DashMap::new(); + for child_namespace in namespace.children().await? { + let catalog_name = child_namespace.name().to_string(); + let catalog_provider = Arc::new(LanceCatalogProvider::try_new(child_namespace).await?); + catalogs.insert(catalog_name, catalog_provider as Arc); + } + + Ok(Self { + namespace, + catalogs, + }) + } +} + +impl CatalogProviderList for LanceCatalogProviderList { + fn as_any(&self) -> &dyn Any { + self + } + + /// Adds a new catalog to this catalog list. + /// If a catalog of the same name existed before, it is replaced in the list and returned. + fn register_catalog( + &self, + name: String, + catalog: Arc, + ) -> Option> { + self.catalogs.insert(name, catalog) + } + + fn catalog_names(&self) -> Vec { + self.catalogs + .iter() + .map(|entry| entry.key().clone()) + .collect::>() + .into_iter() + .collect() + } + + fn catalog(&self, name: &str) -> Option> { + self.catalogs + .get(name) + .map(|entry| Arc::clone(entry.value())) + } +} + +/// Dynamic [`CatalogProvider`] that views child namespaces under a single +/// top-level namespace as schemas. +#[derive(Debug, Clone)] +pub struct LanceCatalogProvider { + #[allow(dead_code)] + namespace: Namespace, + schemas: DashMap>, +} + +impl LanceCatalogProvider { + pub async fn try_new(namespace: Namespace) -> Result { + let schemas = DashMap::new(); + for child_namespace in namespace.children().await? { + let schema_name = child_namespace.name().to_string(); + let schema_provider = Arc::new(LanceSchemaProvider::try_new(child_namespace).await?); + schemas.insert(schema_name, schema_provider as Arc); + } + + Ok(Self { namespace, schemas }) + } +} + +impl CatalogProvider for LanceCatalogProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema_names(&self) -> Vec { + self.schemas + .iter() + .map(|entry| entry.key().clone()) + .collect::>() + .into_iter() + .collect() + } + + fn schema(&self, schema_name: &str) -> Option> { + self.schemas + .get(schema_name) + .map(|entry| Arc::clone(entry.value())) + } + + fn register_schema( + &self, + name: &str, + schema: Arc, + ) -> Result>> { + Ok(self.schemas.insert(name.to_string(), schema)) + } +} diff --git a/rust/lance-namespace-datafusion/src/error.rs b/rust/lance-namespace-datafusion/src/error.rs new file mode 100755 index 0000000000..321023a38e --- /dev/null +++ b/rust/lance-namespace-datafusion/src/error.rs @@ -0,0 +1,16 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use datafusion::error::{DataFusionError, Result}; + +/// Convert a Lance error into a DataFusion error. +/// +/// This keeps all Lance-specific error formatting in a single place. +pub fn to_datafusion_error(err: E) -> DataFusionError { + DataFusionError::Execution(err.to_string()) +} + +/// Convenience helper for wrapping fallible operations. +pub fn df_result(res: std::result::Result) -> Result { + res.map_err(to_datafusion_error) +} diff --git a/rust/lance-namespace-datafusion/src/lib.rs b/rust/lance-namespace-datafusion/src/lib.rs new file mode 100755 index 0000000000..6ef8fe7a42 --- /dev/null +++ b/rust/lance-namespace-datafusion/src/lib.rs @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Lance integration between namespaces and Apache DataFusion catalogs. +//! +//! This crate provides adapters to expose Lance namespaces as +//! DataFusion `CatalogProviderList`, `CatalogProvider`, and +//! `SchemaProvider` implementations. It intentionally focuses on +//! read-only catalog and schema mapping. + +pub mod catalog; +pub mod error; +pub mod namespace; +pub mod schema; +pub mod session_builder; + +pub use catalog::{LanceCatalogProvider, LanceCatalogProviderList}; +pub use namespace::Namespace; +pub use schema::LanceSchemaProvider; +pub use session_builder::SessionBuilder; diff --git a/rust/lance-namespace-datafusion/src/namespace.rs b/rust/lance-namespace-datafusion/src/namespace.rs new file mode 100755 index 0000000000..5dd7c2812c --- /dev/null +++ b/rust/lance-namespace-datafusion/src/namespace.rs @@ -0,0 +1,126 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::Arc; + +use lance::dataset::builder::DatasetBuilder; +use lance::{Dataset, Result}; +use lance_namespace::models::{ListNamespacesRequest, ListTablesRequest}; +use lance_namespace::LanceNamespace; + +const DEFAULT_CATALOG_NAME: &str = "lance"; + +/// Lightweight wrapper around a Lance namespace handle and identifier. +#[derive(Debug, Clone)] +pub struct Namespace { + root: Arc, + /// Full namespace identifier, e.g. [catalog, schema]. + namespace_id: Option>, +} + +impl From> for Namespace { + fn from(lance_namespace: Arc) -> Self { + Self::from_root(Arc::clone(&lance_namespace)) + } +} + +impl From<(Arc, String)> for Namespace { + fn from(lance_namespace: (Arc, String)) -> Self { + Self::from_namespace(Arc::clone(&lance_namespace.0), vec![lance_namespace.1]) + } +} + +impl From<(Arc, Vec)> for Namespace { + fn from(lance_namespace: (Arc, Vec)) -> Self { + Self::from_namespace(Arc::clone(&lance_namespace.0), lance_namespace.1) + } +} + +impl Namespace { + /// Construct a namespace rooted at the top-level Lance namespace. + pub fn from_root(root: Arc) -> Self { + Self { + root, + namespace_id: None, + } + } + + /// Construct a namespace for a specific child identifier under the root. + pub fn from_namespace(root: Arc, namespace_id: Vec) -> Self { + Self { + root, + namespace_id: Some(namespace_id), + } + } + + /// Return the full namespace identifier. + pub fn id(&self) -> Vec { + self.namespace_id.clone().unwrap_or_default() + } + + /// Human readable name for this namespace (last component or default). + pub fn name(&self) -> &str { + self.namespace_id + .as_deref() + .and_then(|v| v.last()) + .map_or(DEFAULT_CATALOG_NAME, |relative_name| relative_name.as_str()) + } + + fn child_id(&self, child_name: String) -> Vec { + match &self.namespace_id { + Some(namespace_id) => { + let mut child_namespace = namespace_id.clone(); + child_namespace.push(child_name); + child_namespace + } + None => vec![child_name], + } + } + + /// List direct child namespaces. + pub async fn children(&self) -> Result> { + let root = Arc::clone(&self.root); + let namespace_id = self.namespace_id.clone().unwrap_or_default(); + let request = ListNamespacesRequest { + id: Some(namespace_id.clone()), + page_token: None, + limit: None, + ..Default::default() + }; + + let namespaces = root.list_namespaces(request).await?.namespaces; + + Ok(namespaces + .into_iter() + .map(|relative_ns_id| { + Self::from_namespace(Arc::clone(&self.root), self.child_id(relative_ns_id)) + }) + .collect()) + } + + /// List table names under this namespace. + pub async fn tables(&self) -> Result> { + let root = Arc::clone(&self.root); + let namespace_id = self.namespace_id.clone().unwrap_or_default(); + let request = ListTablesRequest { + id: Some(namespace_id), + page_token: None, + limit: None, + ..Default::default() + }; + + root.list_tables(request).await.map(|resp| resp.tables) + } + + /// Load a Lance dataset for the given table name in this namespace. + pub async fn load_dataset(&self, table_name: &str) -> Result { + DatasetBuilder::from_namespace( + Arc::clone(&self.root), + self.child_id(table_name.to_string()), + false, + ) + .await? + .load() + .await + } +} diff --git a/rust/lance-namespace-datafusion/src/schema.rs b/rust/lance-namespace-datafusion/src/schema.rs new file mode 100755 index 0000000000..baa397e650 --- /dev/null +++ b/rust/lance-namespace-datafusion/src/schema.rs @@ -0,0 +1,92 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! DataFusion `SchemaProvider` backed by a Lance namespace. +//! +//! This provider resolves tables on demand from a Lance [`Namespace`] +//! and caches `LanceTableProvider` instances per table name. It focuses +//! on read-only access; DML is handled elsewhere. + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use dashmap::DashMap; +use datafusion::catalog::SchemaProvider; +use datafusion::datasource::TableProvider; +use datafusion::error::Result; + +use crate::error::to_datafusion_error; +use crate::namespace::Namespace; +use lance::datafusion::LanceTableProvider; + +/// Dynamic [`SchemaProvider`] backed directly by a [`Namespace`]. +/// +/// * `table(name)` calls `DatasetBuilder::from_namespace` via +/// [`Namespace::load_dataset`] and builds a fresh +/// [`LanceTableProvider`], caching it by table name. +#[derive(Debug, Clone)] +pub struct LanceSchemaProvider { + namespace: Namespace, + tables: DashMap>, +} + +impl LanceSchemaProvider { + pub async fn try_new(namespace: Namespace) -> Result { + Ok(Self { + namespace, + tables: DashMap::new(), + }) + } + + async fn load_and_cache_table( + &self, + table_name: &str, + ) -> Result>> { + let dataset = self + .namespace + .load_dataset(table_name) + .await + .map_err(to_datafusion_error)?; + let dataset = Arc::new(dataset); + let table_provider = Arc::new(LanceTableProvider::new(dataset, false, false)); + self.tables + .insert(table_name.to_string(), Arc::clone(&table_provider)); + Ok(Some(table_provider as Arc)) + } +} + +#[async_trait] +impl SchemaProvider for LanceSchemaProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn table_names(&self) -> Vec { + self.tables + .iter() + .map(|entry| entry.key().clone()) + .collect() + } + + async fn table(&self, table_name: &str) -> Result>> { + if let Some(existing) = self.tables.get(table_name) { + // Reuse cached provider when still fresh; otherwise reload. + let ds = existing.dataset(); + let latest = ds.latest_version_id().await.map_err(to_datafusion_error)?; + let is_stale = latest != ds.version().version; + if is_stale { + self.tables.remove(table_name); + self.load_and_cache_table(table_name).await + } else { + Ok(Some(Arc::clone(existing.value()) as Arc)) + } + } else { + self.load_and_cache_table(table_name).await + } + } + + fn table_exist(&self, name: &str) -> bool { + self.tables.contains_key(name) + } +} diff --git a/rust/lance-namespace-datafusion/src/session_builder.rs b/rust/lance-namespace-datafusion/src/session_builder.rs new file mode 100755 index 0000000000..e7df0019fb --- /dev/null +++ b/rust/lance-namespace-datafusion/src/session_builder.rs @@ -0,0 +1,85 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Builder for constructing DataFusion `SessionContext` instances +//! backed by Lance namespaces. +//! +//! This API is intentionally minimal and returns the native +//! `SessionContext` without adding an extra wrapper. It wires up +//! dynamic catalog and schema providers so that Lance namespaces can +//! be accessed via `catalog.schema.table` names. + +use std::sync::Arc; + +use datafusion::error::Result; +use datafusion::execution::context::{SessionConfig, SessionContext}; + +use crate::catalog::LanceCatalogProviderList; +use crate::namespace::Namespace; +use crate::LanceCatalogProvider; + +/// Builder for configuring a `SessionContext` with Lance namespaces. +#[derive(Clone, Debug, Default)] +pub struct SessionBuilder { + /// Optional root namespace exposed via a dynamic + /// `LanceCatalogProviderList`. + root: Option, + /// Explicit catalogs to register by name. + catalogs: Vec<(String, Namespace)>, + /// Optional DataFusion session configuration. + config: Option, +} + +impl SessionBuilder { + /// Create a new builder with no namespaces or configuration. + pub fn new() -> Self { + Self::default() + } + + /// Attach a root `LanceNamespace` that is exposed as a dynamic + /// catalog list via `LanceCatalogProviderList`. + pub fn with_root(mut self, ns: Namespace) -> Self { + self.root = Some(ns); + self + } + + /// Register an additional catalog backed by the given namespace. + /// + /// The catalog is identified by `name` and can later be combined + /// with schemas via `SessionBuilder::add_schema` using the same + /// namespace. + pub fn add_catalog(mut self, name: &str, ns: Namespace) -> Self { + self.catalogs.push((name.to_string(), ns)); + self + } + + /// Provide an explicit `SessionConfig` for the underlying + /// `SessionContext`. + pub fn with_config(mut self, config: SessionConfig) -> Self { + self.config = Some(config); + self + } + + /// Build a `SessionContext` with all configured namespaces. + pub async fn build(self) -> Result { + let ctx = if let Some(config) = self.config { + SessionContext::new_with_config(config) + } else { + SessionContext::new() + }; + + if let Some(root) = self.root { + let catalog_list = Arc::new(LanceCatalogProviderList::try_new(root).await?); + ctx.register_catalog_list(catalog_list); + } + + for (catalog_name, namespace) in self.catalogs { + ctx.register_catalog( + catalog_name, + Arc::new(LanceCatalogProvider::try_new(namespace).await?), + ); + } + + Ok(ctx) + } +} diff --git a/rust/lance-namespace-datafusion/tests/sql.rs b/rust/lance-namespace-datafusion/tests/sql.rs new file mode 100755 index 0000000000..e20901a83f --- /dev/null +++ b/rust/lance-namespace-datafusion/tests/sql.rs @@ -0,0 +1,418 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +use std::sync::Arc; + +use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator, StringArray}; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::error::{DataFusionError, Result as DFResult}; +use datafusion::prelude::SessionContext; +use lance::dataset::{WriteMode, WriteParams}; +use lance::Dataset; +use lance_namespace::models::CreateNamespaceRequest; +use lance_namespace::LanceNamespace; +use lance_namespace_datafusion::{Namespace, SessionBuilder}; +use lance_namespace_impls::DirectoryNamespaceBuilder; +use tempfile::TempDir; + +struct Context { + #[allow(dead_code)] + root_dir: TempDir, + #[allow(dead_code)] + extra_dir: TempDir, + ctx: SessionContext, +} + +fn col(batch: &RecordBatch, idx: usize) -> &T { + batch.column(idx).as_any().downcast_ref::().unwrap() +} + +fn customers_data() -> (Arc, RecordBatch) { + let schema = Arc::new(Schema::new(vec![ + Field::new("customer_id", DataType::Int32, false), + Field::new("name", DataType::Utf8, false), + Field::new("city", DataType::Utf8, false), + ])); + + let customer_ids = Int32Array::from(vec![1, 2, 3]); + let names = StringArray::from(vec!["Alice", "Bob", "Carol"]); + let cities = StringArray::from(vec!["NY", "SF", "LA"]); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(customer_ids), Arc::new(names), Arc::new(cities)], + ) + .unwrap(); + + (schema, batch) +} + +fn orders_data() -> (Arc, RecordBatch) { + let schema = Arc::new(Schema::new(vec![ + Field::new("order_id", DataType::Int32, false), + Field::new("customer_id", DataType::Int32, false), + Field::new("amount", DataType::Int32, false), + ])); + + let order_ids = Int32Array::from(vec![101, 102, 103]); + let customer_ids = Int32Array::from(vec![1, 2, 3]); + let amounts = Int32Array::from(vec![100, 200, 300]); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(order_ids), + Arc::new(customer_ids), + Arc::new(amounts), + ], + ) + .unwrap(); + + (schema, batch) +} + +fn orders2_data() -> (Arc, RecordBatch) { + let schema = Arc::new(Schema::new(vec![ + Field::new("order_id", DataType::Int32, false), + Field::new("customer_id", DataType::Int32, false), + Field::new("amount", DataType::Int32, false), + ])); + + let order_ids = Int32Array::from(vec![201, 202]); + let customer_ids = Int32Array::from(vec![1, 2]); + let amounts = Int32Array::from(vec![150, 250]); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(order_ids), + Arc::new(customer_ids), + Arc::new(amounts), + ], + ) + .unwrap(); + + (schema, batch) +} + +fn customers_dim_data() -> (Arc, RecordBatch) { + let schema = Arc::new(Schema::new(vec![ + Field::new("customer_id", DataType::Int32, false), + Field::new("segment", DataType::Utf8, false), + ])); + + let customer_ids = Int32Array::from(vec![1, 2, 3]); + let segments = StringArray::from(vec!["Silver", "Gold", "Platinum"]); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(customer_ids), Arc::new(segments)], + ) + .unwrap(); + + (schema, batch) +} + +async fn write_table( + dir: &TempDir, + file_name: &str, + schema: Arc, + batch: RecordBatch, +) -> DFResult<()> { + let full_path = dir.path().join(file_name); + if let Some(parent) = full_path.parent() { + std::fs::create_dir_all(parent)?; + } + + let uri = full_path.to_str().unwrap().to_string(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + let write_params = WriteParams { + mode: WriteMode::Create, + ..Default::default() + }; + + Dataset::write(reader, &uri, Some(write_params)) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + Ok(()) +} + +async fn setup_test_context() -> DFResult { + let root_dir = TempDir::new()?; + let extra_dir = TempDir::new()?; + + let (customers_schema, customers_batch) = customers_data(); + write_table( + &root_dir, + "retail$sales$customers.lance", + customers_schema, + customers_batch, + ) + .await?; + + let (orders_schema, orders_batch) = orders_data(); + write_table( + &root_dir, + "retail$sales$orders.lance", + orders_schema, + orders_batch, + ) + .await?; + + let (orders2_schema, orders2_batch) = orders2_data(); + write_table( + &root_dir, + "wholesale$sales2$orders2.lance", + orders2_schema, + orders2_batch, + ) + .await?; + + let (dim_schema, dim_batch) = customers_dim_data(); + write_table( + &extra_dir, + "crm$dim$customers_dim.lance", + dim_schema, + dim_batch, + ) + .await?; + + let root_path = root_dir.path().to_string_lossy().to_string(); + let root_dir_ns = DirectoryNamespaceBuilder::new(root_path) + .manifest_enabled(true) + .dir_listing_enabled(true) + .build() + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let extra_path = extra_dir.path().to_string_lossy().to_string(); + let extra_dir_ns = DirectoryNamespaceBuilder::new(extra_path) + .manifest_enabled(true) + .dir_listing_enabled(true) + .build() + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + // Create nested namespaces for retail / wholesale / crm. + let mut create_retail = CreateNamespaceRequest::new(); + create_retail.id = Some(vec!["retail".to_string()]); + root_dir_ns + .create_namespace(create_retail) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let mut create_sales = CreateNamespaceRequest::new(); + create_sales.id = Some(vec!["retail".to_string(), "sales".to_string()]); + root_dir_ns + .create_namespace(create_sales) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let mut create_wholesale = CreateNamespaceRequest::new(); + create_wholesale.id = Some(vec!["wholesale".to_string()]); + root_dir_ns + .create_namespace(create_wholesale) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let mut create_sales2 = CreateNamespaceRequest::new(); + create_sales2.id = Some(vec!["wholesale".to_string(), "sales2".to_string()]); + root_dir_ns + .create_namespace(create_sales2) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let mut create_crm = CreateNamespaceRequest::new(); + create_crm.id = Some(vec!["crm".to_string()]); + extra_dir_ns + .create_namespace(create_crm) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let mut create_dim = CreateNamespaceRequest::new(); + create_dim.id = Some(vec!["crm".to_string(), "dim".to_string()]); + extra_dir_ns + .create_namespace(create_dim) + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + root_dir_ns + .migrate() + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + extra_dir_ns + .migrate() + .await + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + + let root_ns: Arc = Arc::new(root_dir_ns); + let extra_ns: Arc = Arc::new(extra_dir_ns); + + let ctx = SessionBuilder::new() + .with_root(Namespace::from_root(Arc::clone(&root_ns))) + .add_catalog( + "crm", + Namespace::from_namespace(Arc::clone(&extra_ns), vec!["crm".to_string()]), + ) + .build() + .await?; + + Ok(Context { + root_dir, + extra_dir, + ctx, + }) +} + +#[tokio::test] +async fn join_within_retail() -> DFResult<()> { + let ns = setup_test_context().await?; + + let df = ns + .ctx + .sql( + "SELECT customers.name, orders.amount \ + FROM retail.sales.customers customers \ + JOIN retail.sales.orders orders \ + ON customers.customer_id = orders.customer_id \ + WHERE customers.customer_id = 2", + ) + .await?; + let batches = df.collect().await?; + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + + let name_col = col::(batch, 0); + let amount_col = col::(batch, 1); + + assert_eq!(name_col.value(0), "Bob"); + assert_eq!(amount_col.value(0), 200); + + Ok(()) +} + +#[tokio::test] +async fn join_across_root_catalogs() -> DFResult<()> { + let ns = setup_test_context().await?; + + let df = ns + .ctx + .sql( + "SELECT c.name, o2.amount \ + FROM retail.sales.customers c \ + JOIN wholesale.sales2.orders2 o2 \ + ON c.customer_id = o2.customer_id \ + WHERE o2.order_id = 202", + ) + .await?; + let batches = df.collect().await?; + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + + let name_col = col::(batch, 0); + let amount_col = col::(batch, 1); + + assert_eq!(name_col.value(0), "Bob"); + assert_eq!(amount_col.value(0), 250); + + Ok(()) +} + +#[tokio::test] +async fn join_across_catalogs() -> DFResult<()> { + let ns = setup_test_context().await?; + + let df = ns + .ctx + .sql( + "SELECT customers.name, dim.segment \ + FROM retail.sales.customers customers \ + JOIN crm.dim.customers_dim dim \ + ON customers.customer_id = dim.customer_id \ + WHERE customers.customer_id = 3", + ) + .await?; + let batches = df.collect().await?; + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + + let name_col = col::(batch, 0); + let segment_col = col::(batch, 1); + + assert_eq!(name_col.value(0), "Carol"); + assert_eq!(segment_col.value(0), "Platinum"); + + Ok(()) +} + +#[tokio::test] +async fn aggregation_city_totals() -> DFResult<()> { + let ns = setup_test_context().await?; + + let df = ns + .ctx + .sql( + "SELECT city, SUM(amount) AS total \ + FROM retail.sales.orders o \ + JOIN retail.sales.customers c \ + ON c.customer_id = o.customer_id \ + GROUP BY city \ + ORDER BY city", + ) + .await?; + let batches = df.collect().await?; + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 3); + + let city_col = col::(batch, 0); + let total_col = col::(batch, 1); + + assert_eq!(city_col.value(0), "LA"); + assert_eq!(total_col.value(0), 300); + + assert_eq!(city_col.value(1), "NY"); + assert_eq!(total_col.value(1), 100); + + assert_eq!(city_col.value(2), "SF"); + assert_eq!(total_col.value(2), 200); + + Ok(()) +} + +#[tokio::test] +async fn cte_view_customer_orders() -> DFResult<()> { + let ns = setup_test_context().await?; + + let df = ns + .ctx + .sql( + "WITH customer_orders AS ( \ + SELECT c.customer_id, c.name, o.order_id, o.amount \ + FROM retail.sales.customers c \ + JOIN retail.sales.orders o \ + ON c.customer_id = o.customer_id \ + ) \ + SELECT order_id, name, amount FROM customer_orders WHERE customer_id = 1", + ) + .await?; + let batches = df.collect().await?; + assert_eq!(batches.len(), 1); + let batch = &batches[0]; + assert_eq!(batch.num_rows(), 1); + + let order_id_col = col::(batch, 0); + let name_col = col::(batch, 1); + let amount_col = col::(batch, 2); + + assert_eq!(order_id_col.value(0), 101); + assert_eq!(name_col.value(0), "Alice"); + assert_eq!(amount_col.value(0), 100); + + Ok(()) +} From 958e5289738c514d6ecbfb32b78e612724e49e26 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Sun, 11 Jan 2026 23:40:09 +0800 Subject: [PATCH 2/7] fmt --- Cargo.lock | 2 +- .../lance-namespace-datafusion/src/catalog.rs | 22 +++++++++++++------ rust/lance-namespace-datafusion/src/lib.rs | 4 ++-- .../src/{namespace.rs => namespace_level.rs} | 10 ++++----- rust/lance-namespace-datafusion/src/schema.rs | 16 +++++++------- .../src/session_builder.rs | 10 ++++----- rust/lance-namespace-datafusion/tests/sql.rs | 6 ++--- 7 files changed, 39 insertions(+), 31 deletions(-) rename rust/lance-namespace-datafusion/src/{namespace.rs => namespace_level.rs} (94%) diff --git a/Cargo.lock b/Cargo.lock index 7ec7e9075f..1efdf41cc5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5180,7 +5180,7 @@ dependencies = [ [[package]] name = "lance-namespace-datafusion" -version = "2.0.0-beta.4" +version = "2.0.0-beta.6" dependencies = [ "arrow", "arrow-array", diff --git a/rust/lance-namespace-datafusion/src/catalog.rs b/rust/lance-namespace-datafusion/src/catalog.rs index 9ccc9cde73..16edef1e45 100755 --- a/rust/lance-namespace-datafusion/src/catalog.rs +++ b/rust/lance-namespace-datafusion/src/catalog.rs @@ -11,7 +11,7 @@ use dashmap::DashMap; use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider}; use datafusion::error::Result; -use crate::namespace::Namespace; +use crate::namespace_level::NamespaceLevel; use crate::schema::LanceSchemaProvider; /// A dynamic [`CatalogProviderList`] that maps Lance namespaces to catalogs. @@ -23,13 +23,16 @@ use crate::schema::LanceSchemaProvider; pub struct LanceCatalogProviderList { /// Root Lance namespace used to resolve catalogs / schemas / tables. #[allow(dead_code)] - namespace: Namespace, + ns_level: NamespaceLevel, /// Catalogs that have been loaded from the root namespace. + /// + /// Note: The values in this map may become stale over time, as there is currently + /// no mechanism to automatically refresh or invalidate cached catalog providers. catalogs: DashMap>, } impl LanceCatalogProviderList { - pub async fn try_new(namespace: Namespace) -> Result { + pub async fn try_new(namespace: NamespaceLevel) -> Result { let catalogs = DashMap::new(); for child_namespace in namespace.children().await? { let catalog_name = child_namespace.name().to_string(); @@ -38,7 +41,7 @@ impl LanceCatalogProviderList { } Ok(Self { - namespace, + ns_level: namespace, catalogs, }) } @@ -80,12 +83,14 @@ impl CatalogProviderList for LanceCatalogProviderList { #[derive(Debug, Clone)] pub struct LanceCatalogProvider { #[allow(dead_code)] - namespace: Namespace, + ns_level: NamespaceLevel, + /// Note: The values in this map may become stale over time, as there is currently + /// no mechanism to automatically refresh or invalidate cached schema providers. schemas: DashMap>, } impl LanceCatalogProvider { - pub async fn try_new(namespace: Namespace) -> Result { + pub async fn try_new(namespace: NamespaceLevel) -> Result { let schemas = DashMap::new(); for child_namespace in namespace.children().await? { let schema_name = child_namespace.name().to_string(); @@ -93,7 +98,10 @@ impl LanceCatalogProvider { schemas.insert(schema_name, schema_provider as Arc); } - Ok(Self { namespace, schemas }) + Ok(Self { + ns_level: namespace, + schemas, + }) } } diff --git a/rust/lance-namespace-datafusion/src/lib.rs b/rust/lance-namespace-datafusion/src/lib.rs index 6ef8fe7a42..bf4d5e3264 100755 --- a/rust/lance-namespace-datafusion/src/lib.rs +++ b/rust/lance-namespace-datafusion/src/lib.rs @@ -10,11 +10,11 @@ pub mod catalog; pub mod error; -pub mod namespace; +pub mod namespace_level; pub mod schema; pub mod session_builder; pub use catalog::{LanceCatalogProvider, LanceCatalogProviderList}; -pub use namespace::Namespace; +pub use namespace_level::NamespaceLevel; pub use schema::LanceSchemaProvider; pub use session_builder::SessionBuilder; diff --git a/rust/lance-namespace-datafusion/src/namespace.rs b/rust/lance-namespace-datafusion/src/namespace_level.rs similarity index 94% rename from rust/lance-namespace-datafusion/src/namespace.rs rename to rust/lance-namespace-datafusion/src/namespace_level.rs index 5dd7c2812c..2bcc282ade 100755 --- a/rust/lance-namespace-datafusion/src/namespace.rs +++ b/rust/lance-namespace-datafusion/src/namespace_level.rs @@ -12,31 +12,31 @@ const DEFAULT_CATALOG_NAME: &str = "lance"; /// Lightweight wrapper around a Lance namespace handle and identifier. #[derive(Debug, Clone)] -pub struct Namespace { +pub struct NamespaceLevel { root: Arc, /// Full namespace identifier, e.g. [catalog, schema]. namespace_id: Option>, } -impl From> for Namespace { +impl From> for NamespaceLevel { fn from(lance_namespace: Arc) -> Self { Self::from_root(Arc::clone(&lance_namespace)) } } -impl From<(Arc, String)> for Namespace { +impl From<(Arc, String)> for NamespaceLevel { fn from(lance_namespace: (Arc, String)) -> Self { Self::from_namespace(Arc::clone(&lance_namespace.0), vec![lance_namespace.1]) } } -impl From<(Arc, Vec)> for Namespace { +impl From<(Arc, Vec)> for NamespaceLevel { fn from(lance_namespace: (Arc, Vec)) -> Self { Self::from_namespace(Arc::clone(&lance_namespace.0), lance_namespace.1) } } -impl Namespace { +impl NamespaceLevel { /// Construct a namespace rooted at the top-level Lance namespace. pub fn from_root(root: Arc) -> Self { Self { diff --git a/rust/lance-namespace-datafusion/src/schema.rs b/rust/lance-namespace-datafusion/src/schema.rs index baa397e650..9bf1eb62b4 100755 --- a/rust/lance-namespace-datafusion/src/schema.rs +++ b/rust/lance-namespace-datafusion/src/schema.rs @@ -3,7 +3,7 @@ //! DataFusion `SchemaProvider` backed by a Lance namespace. //! -//! This provider resolves tables on demand from a Lance [`Namespace`] +//! This provider resolves tables on demand from a Lance [`NamespaceLevel`] //! and caches `LanceTableProvider` instances per table name. It focuses //! on read-only access; DML is handled elsewhere. @@ -17,24 +17,24 @@ use datafusion::datasource::TableProvider; use datafusion::error::Result; use crate::error::to_datafusion_error; -use crate::namespace::Namespace; +use crate::namespace_level::NamespaceLevel; use lance::datafusion::LanceTableProvider; -/// Dynamic [`SchemaProvider`] backed directly by a [`Namespace`]. +/// Dynamic [`SchemaProvider`] backed directly by a [`NamespaceLevel`]. /// /// * `table(name)` calls `DatasetBuilder::from_namespace` via -/// [`Namespace::load_dataset`] and builds a fresh +/// [`NamespaceLevel::load_dataset`] and builds a fresh /// [`LanceTableProvider`], caching it by table name. #[derive(Debug, Clone)] pub struct LanceSchemaProvider { - namespace: Namespace, + ns_level: NamespaceLevel, tables: DashMap>, } impl LanceSchemaProvider { - pub async fn try_new(namespace: Namespace) -> Result { + pub async fn try_new(namespace: NamespaceLevel) -> Result { Ok(Self { - namespace, + ns_level: namespace, tables: DashMap::new(), }) } @@ -44,7 +44,7 @@ impl LanceSchemaProvider { table_name: &str, ) -> Result>> { let dataset = self - .namespace + .ns_level .load_dataset(table_name) .await .map_err(to_datafusion_error)?; diff --git a/rust/lance-namespace-datafusion/src/session_builder.rs b/rust/lance-namespace-datafusion/src/session_builder.rs index e7df0019fb..0ee207e3cb 100755 --- a/rust/lance-namespace-datafusion/src/session_builder.rs +++ b/rust/lance-namespace-datafusion/src/session_builder.rs @@ -15,7 +15,7 @@ use datafusion::error::Result; use datafusion::execution::context::{SessionConfig, SessionContext}; use crate::catalog::LanceCatalogProviderList; -use crate::namespace::Namespace; +use crate::namespace_level::NamespaceLevel; use crate::LanceCatalogProvider; /// Builder for configuring a `SessionContext` with Lance namespaces. @@ -23,9 +23,9 @@ use crate::LanceCatalogProvider; pub struct SessionBuilder { /// Optional root namespace exposed via a dynamic /// `LanceCatalogProviderList`. - root: Option, + root: Option, /// Explicit catalogs to register by name. - catalogs: Vec<(String, Namespace)>, + catalogs: Vec<(String, NamespaceLevel)>, /// Optional DataFusion session configuration. config: Option, } @@ -38,7 +38,7 @@ impl SessionBuilder { /// Attach a root `LanceNamespace` that is exposed as a dynamic /// catalog list via `LanceCatalogProviderList`. - pub fn with_root(mut self, ns: Namespace) -> Self { + pub fn with_root(mut self, ns: NamespaceLevel) -> Self { self.root = Some(ns); self } @@ -48,7 +48,7 @@ impl SessionBuilder { /// The catalog is identified by `name` and can later be combined /// with schemas via `SessionBuilder::add_schema` using the same /// namespace. - pub fn add_catalog(mut self, name: &str, ns: Namespace) -> Self { + pub fn add_catalog(mut self, name: &str, ns: NamespaceLevel) -> Self { self.catalogs.push((name.to_string(), ns)); self } diff --git a/rust/lance-namespace-datafusion/tests/sql.rs b/rust/lance-namespace-datafusion/tests/sql.rs index e20901a83f..603120efcf 100755 --- a/rust/lance-namespace-datafusion/tests/sql.rs +++ b/rust/lance-namespace-datafusion/tests/sql.rs @@ -11,7 +11,7 @@ use lance::dataset::{WriteMode, WriteParams}; use lance::Dataset; use lance_namespace::models::CreateNamespaceRequest; use lance_namespace::LanceNamespace; -use lance_namespace_datafusion::{Namespace, SessionBuilder}; +use lance_namespace_datafusion::{NamespaceLevel, SessionBuilder}; use lance_namespace_impls::DirectoryNamespaceBuilder; use tempfile::TempDir; @@ -251,10 +251,10 @@ async fn setup_test_context() -> DFResult { let extra_ns: Arc = Arc::new(extra_dir_ns); let ctx = SessionBuilder::new() - .with_root(Namespace::from_root(Arc::clone(&root_ns))) + .with_root(NamespaceLevel::from_root(Arc::clone(&root_ns))) .add_catalog( "crm", - Namespace::from_namespace(Arc::clone(&extra_ns), vec!["crm".to_string()]), + NamespaceLevel::from_namespace(Arc::clone(&extra_ns), vec!["crm".to_string()]), ) .build() .await?; From d76047f0b5a52349c9641bd99b4d67d5b99b2a97 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 13 Jan 2026 14:59:55 +0800 Subject: [PATCH 3/7] update cargo.toml --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index 7e5797374a..bc93074a41 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5179,7 +5179,7 @@ dependencies = [ [[package]] name = "lance-namespace-datafusion" -version = "2.0.0-beta.6" +version = "2.0.0-beta.8" dependencies = [ "arrow", "arrow-array", From 8b58da900682f4d49679a1e906ecd43274a98547 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 13 Jan 2026 16:14:41 +0800 Subject: [PATCH 4/7] optimize docs --- rust/lance-namespace-datafusion/src/catalog.rs | 15 ++++++++++----- rust/lance-namespace-datafusion/src/schema.rs | 7 +++---- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/rust/lance-namespace-datafusion/src/catalog.rs b/rust/lance-namespace-datafusion/src/catalog.rs index 16edef1e45..f6a80ac745 100755 --- a/rust/lance-namespace-datafusion/src/catalog.rs +++ b/rust/lance-namespace-datafusion/src/catalog.rs @@ -16,9 +16,9 @@ use crate::schema::LanceSchemaProvider; /// A dynamic [`CatalogProviderList`] that maps Lance namespaces to catalogs. /// -/// Top-level namespaces (e.g. `ListNamespacesRequest { id: Some(vec![]) }`) -/// are exposed as catalog names. Child namespaces under a given catalog are -/// exposed as schemas via [`LanceCatalogProvider`]. +/// The underlying namespace must be a four-level namespace. It is explicitly configured +/// via [`SessionBuilder::with_root`], and each child namespace under this root is +/// automatically registered as a [`LanceCatalogProvider`]. #[derive(Debug, Clone)] pub struct LanceCatalogProviderList { /// Root Lance namespace used to resolve catalogs / schemas / tables. @@ -78,8 +78,13 @@ impl CatalogProviderList for LanceCatalogProviderList { } } -/// Dynamic [`CatalogProvider`] that views child namespaces under a single -/// top-level namespace as schemas. +/// A dynamic [`CatalogProvider`] that exposes the immediate child namespaces +/// of a Lance namespace as database schemas. +/// +/// The underlying namespace must be a three-level namespace. It is either explicitly +/// registered via [`SessionBuilder::add_catalog`], or automatically created as part of +/// the catalog hierarchy when [`SessionBuilder::with_root`] is used. +/// Child namespaces are automatically loaded as [`LanceSchemaProvider`] instances. #[derive(Debug, Clone)] pub struct LanceCatalogProvider { #[allow(dead_code)] diff --git a/rust/lance-namespace-datafusion/src/schema.rs b/rust/lance-namespace-datafusion/src/schema.rs index 9bf1eb62b4..64b4251796 100755 --- a/rust/lance-namespace-datafusion/src/schema.rs +++ b/rust/lance-namespace-datafusion/src/schema.rs @@ -20,11 +20,10 @@ use crate::error::to_datafusion_error; use crate::namespace_level::NamespaceLevel; use lance::datafusion::LanceTableProvider; -/// Dynamic [`SchemaProvider`] backed directly by a [`NamespaceLevel`]. +/// A dynamic [`SchemaProvider`] backed directly by a [`NamespaceLevel`]. /// -/// * `table(name)` calls `DatasetBuilder::from_namespace` via -/// [`NamespaceLevel::load_dataset`] and builds a fresh -/// [`LanceTableProvider`], caching it by table name. +/// Exposes Lance tables in the namespace as [`LanceTableProvider`] instances, +/// loaded on demand and cached by table name. #[derive(Debug, Clone)] pub struct LanceSchemaProvider { ns_level: NamespaceLevel, From 1c079449e97bcfa401abee6419d11f6c9bc0e99b Mon Sep 17 00:00:00 2001 From: majin1102 Date: Tue, 13 Jan 2026 16:22:08 +0800 Subject: [PATCH 5/7] optimize docs --- rust/lance-namespace-datafusion/src/catalog.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/rust/lance-namespace-datafusion/src/catalog.rs b/rust/lance-namespace-datafusion/src/catalog.rs index f6a80ac745..43464b7459 100755 --- a/rust/lance-namespace-datafusion/src/catalog.rs +++ b/rust/lance-namespace-datafusion/src/catalog.rs @@ -13,6 +13,8 @@ use datafusion::error::Result; use crate::namespace_level::NamespaceLevel; use crate::schema::LanceSchemaProvider; +#[allow(unused_imports)] +use crate::SessionBuilder; /// A dynamic [`CatalogProviderList`] that maps Lance namespaces to catalogs. /// From 68456a4910bc5213f7caf4638399860be4a370c2 Mon Sep 17 00:00:00 2001 From: "nathan.ma" Date: Thu, 15 Jan 2026 11:51:10 +0800 Subject: [PATCH 6/7] Update rust/lance-namespace-datafusion/tests/sql.rs Co-authored-by: Will Jones --- rust/lance-namespace-datafusion/tests/sql.rs | 21 ++++++-------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/rust/lance-namespace-datafusion/tests/sql.rs b/rust/lance-namespace-datafusion/tests/sql.rs index 603120efcf..6af54bb7d0 100755 --- a/rust/lance-namespace-datafusion/tests/sql.rs +++ b/rust/lance-namespace-datafusion/tests/sql.rs @@ -28,21 +28,12 @@ fn col(batch: &RecordBatch, idx: usize) -> &T { } fn customers_data() -> (Arc, RecordBatch) { - let schema = Arc::new(Schema::new(vec![ - Field::new("customer_id", DataType::Int32, false), - Field::new("name", DataType::Utf8, false), - Field::new("city", DataType::Utf8, false), - ])); - - let customer_ids = Int32Array::from(vec![1, 2, 3]); - let names = StringArray::from(vec!["Alice", "Bob", "Carol"]); - let cities = StringArray::from(vec!["NY", "SF", "LA"]); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(customer_ids), Arc::new(names), Arc::new(cities)], - ) - .unwrap(); + let batch = record_batch!( + ("customer_id", Int32, [1, 2, 3]), + ("name", Utf8, ["Alice", "Bob", "Carol"]), + ("city", Utf8, ["NY", "SF", "LA"]) + ).unwrap(); + let schema = batch.schema(); (schema, batch) } From 89032bb5a655c4c7bcee44a24fa74f21a601b655 Mon Sep 17 00:00:00 2001 From: majin1102 Date: Thu, 15 Jan 2026 12:31:41 +0800 Subject: [PATCH 7/7] address comments --- rust/lance-namespace-datafusion/src/error.rs | 16 ++--- rust/lance-namespace-datafusion/tests/sql.rs | 71 ++++++-------------- 2 files changed, 26 insertions(+), 61 deletions(-) diff --git a/rust/lance-namespace-datafusion/src/error.rs b/rust/lance-namespace-datafusion/src/error.rs index 321023a38e..633e67d26d 100755 --- a/rust/lance-namespace-datafusion/src/error.rs +++ b/rust/lance-namespace-datafusion/src/error.rs @@ -1,16 +1,10 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors -use datafusion::error::{DataFusionError, Result}; +use datafusion::error::DataFusionError; +use lance::Error; -/// Convert a Lance error into a DataFusion error. -/// -/// This keeps all Lance-specific error formatting in a single place. -pub fn to_datafusion_error(err: E) -> DataFusionError { - DataFusionError::Execution(err.to_string()) -} - -/// Convenience helper for wrapping fallible operations. -pub fn df_result(res: std::result::Result) -> Result { - res.map_err(to_datafusion_error) +/// Converts a lance error into a datafusion error. +pub fn to_datafusion_error(error: Error) -> DataFusionError { + DataFusionError::External(error.into()) } diff --git a/rust/lance-namespace-datafusion/tests/sql.rs b/rust/lance-namespace-datafusion/tests/sql.rs index 6af54bb7d0..3242f5ce23 100755 --- a/rust/lance-namespace-datafusion/tests/sql.rs +++ b/rust/lance-namespace-datafusion/tests/sql.rs @@ -4,7 +4,8 @@ use std::sync::Arc; use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator, StringArray}; -use arrow_schema::{DataType, Field, Schema}; +use arrow_schema::Schema; +use datafusion::common::record_batch; use datafusion::error::{DataFusionError, Result as DFResult}; use datafusion::prelude::SessionContext; use lance::dataset::{WriteMode, WriteParams}; @@ -29,77 +30,47 @@ fn col(batch: &RecordBatch, idx: usize) -> &T { fn customers_data() -> (Arc, RecordBatch) { let batch = record_batch!( - ("customer_id", Int32, [1, 2, 3]), - ("name", Utf8, ["Alice", "Bob", "Carol"]), - ("city", Utf8, ["NY", "SF", "LA"]) - ).unwrap(); + ("customer_id", Int32, vec![1, 2, 3]), + ("name", Utf8, vec!["Alice", "Bob", "Carol"]), + ("city", Utf8, vec!["NY", "SF", "LA"]) + ) + .unwrap(); let schema = batch.schema(); (schema, batch) } fn orders_data() -> (Arc, RecordBatch) { - let schema = Arc::new(Schema::new(vec![ - Field::new("order_id", DataType::Int32, false), - Field::new("customer_id", DataType::Int32, false), - Field::new("amount", DataType::Int32, false), - ])); - - let order_ids = Int32Array::from(vec![101, 102, 103]); - let customer_ids = Int32Array::from(vec![1, 2, 3]); - let amounts = Int32Array::from(vec![100, 200, 300]); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(order_ids), - Arc::new(customer_ids), - Arc::new(amounts), - ], + let batch = record_batch!( + ("order_id", Int32, vec![101, 102, 103]), + ("customer_id", Int32, vec![1, 2, 3]), + ("amount", Int32, vec![100, 200, 300]) ) .unwrap(); + let schema = batch.schema(); (schema, batch) } fn orders2_data() -> (Arc, RecordBatch) { - let schema = Arc::new(Schema::new(vec![ - Field::new("order_id", DataType::Int32, false), - Field::new("customer_id", DataType::Int32, false), - Field::new("amount", DataType::Int32, false), - ])); - - let order_ids = Int32Array::from(vec![201, 202]); - let customer_ids = Int32Array::from(vec![1, 2]); - let amounts = Int32Array::from(vec![150, 250]); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(order_ids), - Arc::new(customer_ids), - Arc::new(amounts), - ], + let batch = record_batch!( + ("order_id", Int32, vec![201, 202]), + ("customer_id", Int32, vec![1, 2]), + ("amount", Int32, vec![150, 250]) ) .unwrap(); + let schema = batch.schema(); (schema, batch) } fn customers_dim_data() -> (Arc, RecordBatch) { - let schema = Arc::new(Schema::new(vec![ - Field::new("customer_id", DataType::Int32, false), - Field::new("segment", DataType::Utf8, false), - ])); - - let customer_ids = Int32Array::from(vec![1, 2, 3]); - let segments = StringArray::from(vec!["Silver", "Gold", "Platinum"]); - - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(customer_ids), Arc::new(segments)], + let batch = record_batch!( + ("customer_id", Int32, vec![1, 2, 3]), + ("segment", Utf8, vec!["Silver", "Gold", "Platinum"]) ) .unwrap(); + let schema = batch.schema(); (schema, batch) }