diff --git a/docs/rest.md b/docs/rest.md new file mode 100644 index 0000000..39d9864 --- /dev/null +++ b/docs/rest.md @@ -0,0 +1,164 @@ +# Connecting to a Lance REST Namespace + +This guide explains how to connect DuckDB to a Lance REST Namespace server (e.g. LanceDB Cloud or Enterprise) using the lance-duckdb extension. + +## Overview + +A Lance REST Namespace provides a centralized catalog for managing Lance tables. The REST Namespace API allows: + +- Listing tables in a namespace +- Creating and dropping tables +- Describing tables (getting location and storage credentials) +- Credential vending for accessing underlying storage (S3, GCS, Azure, etc.) + +## Prerequisites + +1. A Lance REST Namespace server endpoint available +2. The lance-duckdb extension built and available +3. Authentication credentials (API key, bearer token, etc.) + +## ATTACH Syntax + +```sql +ATTACH '' AS ( + TYPE LANCE, + ENDPOINT '', + HEADER '=;=' +); +``` + +### Parameters + +| Parameter | Description | Required | +|-----------|-------------|----------| +| `` | The namespace identifier to connect to | Yes | +| `` | Local alias for the attached database | Yes | +| `ENDPOINT` | URL of the REST Namespace server | Yes | +| `HEADER` | Custom HTTP headers (semicolon-separated key=value pairs) | No | +| `DELIMITER` | Namespace delimiter (default: `$`) | No | +| `BEARER_TOKEN` | Bearer token for authentication | No | +| `API_KEY` | API key for authentication | No | + +### Custom Headers + +The `HEADER` option allows passing custom HTTP headers to the REST Namespace server. Multiple headers can be specified using semicolon separation: + +```sql +HEADER 'x-lancedb-database=my_db;x-api-key=sk_xxx;x-custom-header=value' +``` + +## Example: Connecting to a REST Namespace Server + +### 1. Connect from DuckDB + +```sql +-- Load the Lance extension +LOAD 'lance.duckdb_extension'; + +-- Attach to the REST Namespace +ATTACH 'ns1' AS lance_ns ( + TYPE LANCE, + ENDPOINT 'http://localhost:10024', + HEADER 'x-lancedb-database=lance_ns;x-api-key=sk_localtest' +); + +-- Switch to the attached database +USE lance_ns; +``` + +### 2. List Tables + +```sql +SHOW TABLES; +``` + +Output: +``` +┌─────────┐ +│ name │ +│ varchar │ +├─────────┤ +│ 0 rows │ +└─────────┘ +``` + +### 3. Create a Table + +```sql +CREATE TABLE users ( + id INTEGER, + name VARCHAR, + email VARCHAR +); +``` + +### 4. Insert Data + +```sql +INSERT INTO users VALUES + (1, 'Alice', 'alice@example.com'), + (2, 'Bob', 'bob@example.com'), + (3, 'Charlie', 'charlie@example.com'); +``` + +### 5. Query Data + +```sql +-- Select all rows +SELECT * FROM users; +``` + +Output: +``` +┌───────┬─────────┬─────────────────────┐ +│ id │ name │ email │ +│ int32 │ varchar │ varchar │ +├───────┼─────────┼─────────────────────┤ +│ 1 │ Alice │ alice@example.com │ +│ 2 │ Bob │ bob@example.com │ +│ 3 │ Charlie │ charlie@example.com │ +└───────┴─────────┴─────────────────────┘ +``` + +```sql +-- Select with filter +SELECT * FROM users WHERE id > 1; +``` + +Output: +``` +┌───────┬─────────┬─────────────────────┐ +│ id │ name │ email │ +│ int32 │ varchar │ varchar │ +├───────┼─────────┼─────────────────────┤ +│ 2 │ Bob │ bob@example.com │ +│ 3 │ Charlie │ charlie@example.com │ +└───────┴─────────┴─────────────────────┘ +``` + +```sql +-- Aggregation +SELECT COUNT(*) as total_users FROM users; +``` + +Output: +``` +┌─────────────┐ +│ total_users │ +│ int64 │ +├─────────────┤ +│ 3 │ +└─────────────┘ +``` + +### 6. Using Fully Qualified Names + +You can also use fully qualified table names without switching databases: + +```sql +-- Create table with fully qualified name +CREATE TABLE lance_ns.main.my_table (col1 INTEGER, col2 VARCHAR); + +-- Query with fully qualified name +SELECT * FROM lance_ns.main.my_table; +``` diff --git a/rust/ffi/namespace.rs b/rust/ffi/namespace.rs index 4b4074f..1e8c0b9 100644 --- a/rust/ffi/namespace.rs +++ b/rust/ffi/namespace.rs @@ -31,10 +31,27 @@ unsafe fn optional_cstr_to_string( Ok(Some(s.to_string())) } +fn parse_headers_tsv(headers_tsv: Option<&str>) -> Vec<(String, String)> { + headers_tsv + .map(|tsv| { + tsv.lines() + .filter_map(|line| { + let mut parts = line.splitn(2, '\t'); + match (parts.next(), parts.next()) { + (Some(k), Some(v)) if !k.is_empty() => Some((k.to_string(), v.to_string())), + _ => None, + } + }) + .collect() + }) + .unwrap_or_default() +} + fn build_config( endpoint: &str, bearer_token: Option<&str>, api_key: Option<&str>, + headers_tsv: Option<&str>, ) -> RestNamespaceBuilder { let mut builder = RestNamespaceBuilder::new(endpoint); if let Some(token) = bearer_token { @@ -43,6 +60,10 @@ fn build_config( if let Some(key) = api_key { builder = builder.header("x-api-key", key.to_string()); } + // Add custom headers from TSV + for (key, value) in parse_headers_tsv(headers_tsv) { + builder = builder.header(key, value); + } builder } @@ -65,17 +86,24 @@ fn list_tables_inner( bearer_token: *const c_char, api_key: *const c_char, delimiter: *const c_char, + headers_tsv: *const c_char, ) -> FfiResult> { let endpoint = unsafe { cstr_to_str(endpoint, "endpoint")? }; let namespace_id = unsafe { cstr_to_str(namespace_id, "namespace_id")? }; let delimiter = unsafe { optional_cstr_to_string(delimiter, "delimiter")? }; let bearer_token = unsafe { optional_cstr_to_string(bearer_token, "bearer_token")? }; let api_key = unsafe { optional_cstr_to_string(api_key, "api_key")? }; + let headers_tsv = unsafe { optional_cstr_to_string(headers_tsv, "headers_tsv")? }; let delimiter = delimiter.unwrap_or_else(|| "$".to_string()); - let namespace = build_config(endpoint, bearer_token.as_deref(), api_key.as_deref()) - .delimiter(delimiter) - .build(); + let namespace = build_config( + endpoint, + bearer_token.as_deref(), + api_key.as_deref(), + headers_tsv.as_deref(), + ) + .delimiter(delimiter) + .build(); let tables = runtime::block_on(async move { let mut out = Vec::new(); @@ -115,8 +143,9 @@ pub unsafe extern "C" fn lance_namespace_list_tables( bearer_token: *const c_char, api_key: *const c_char, delimiter: *const c_char, + headers_tsv: *const c_char, ) -> *const c_char { - match list_tables_inner(endpoint, namespace_id, bearer_token, api_key, delimiter) { + match list_tables_inner(endpoint, namespace_id, bearer_token, api_key, delimiter, headers_tsv) { Ok(tables) => { clear_last_error(); let joined = tables.join("\n"); @@ -135,17 +164,24 @@ fn describe_table_info_inner( bearer_token: *const c_char, api_key: *const c_char, delimiter: *const c_char, + headers_tsv: *const c_char, ) -> FfiResult<(String, String)> { let endpoint = unsafe { cstr_to_str(endpoint, "endpoint")? }; let table_id = unsafe { cstr_to_str(table_id, "table_id")? }; let delimiter = unsafe { optional_cstr_to_string(delimiter, "delimiter")? }; let bearer_token = unsafe { optional_cstr_to_string(bearer_token, "bearer_token")? }; let api_key = unsafe { optional_cstr_to_string(api_key, "api_key")? }; + let headers_tsv = unsafe { optional_cstr_to_string(headers_tsv, "headers_tsv")? }; let delimiter = delimiter.unwrap_or_else(|| "$".to_string()); - let namespace = build_config(endpoint, bearer_token.as_deref(), api_key.as_deref()) - .delimiter(delimiter) - .build(); + let namespace = build_config( + endpoint, + bearer_token.as_deref(), + api_key.as_deref(), + headers_tsv.as_deref(), + ) + .delimiter(delimiter) + .build(); let (location, storage_options_tsv) = runtime::block_on(async move { let mut req = DescribeTableRequest::new(); @@ -177,6 +213,7 @@ pub unsafe extern "C" fn lance_namespace_describe_table( bearer_token: *const c_char, api_key: *const c_char, delimiter: *const c_char, + headers_tsv: *const c_char, out_location: *mut *const c_char, out_storage_options_tsv: *mut *const c_char, ) -> i32 { @@ -191,7 +228,7 @@ pub unsafe extern "C" fn lance_namespace_describe_table( } } - match describe_table_info_inner(endpoint, table_id, bearer_token, api_key, delimiter) { + match describe_table_info_inner(endpoint, table_id, bearer_token, api_key, delimiter, headers_tsv) { Ok((location, storage_options_tsv)) => { clear_last_error(); if !out_location.is_null() { @@ -225,17 +262,24 @@ fn create_empty_table_inner( bearer_token: *const c_char, api_key: *const c_char, delimiter: *const c_char, + headers_tsv: *const c_char, ) -> FfiResult<(String, String)> { let endpoint = unsafe { cstr_to_str(endpoint, "endpoint")? }; let table_id = unsafe { cstr_to_str(table_id, "table_id")? }; let delimiter = unsafe { optional_cstr_to_string(delimiter, "delimiter")? }; let bearer_token = unsafe { optional_cstr_to_string(bearer_token, "bearer_token")? }; let api_key = unsafe { optional_cstr_to_string(api_key, "api_key")? }; + let headers_tsv = unsafe { optional_cstr_to_string(headers_tsv, "headers_tsv")? }; let delimiter = delimiter.unwrap_or_else(|| "$".to_string()); - let namespace = build_config(endpoint, bearer_token.as_deref(), api_key.as_deref()) - .delimiter(delimiter) - .build(); + let namespace = build_config( + endpoint, + bearer_token.as_deref(), + api_key.as_deref(), + headers_tsv.as_deref(), + ) + .delimiter(delimiter) + .build(); let (location, storage_options_tsv) = runtime::block_on(async move { let mut req = CreateEmptyTableRequest::new(); @@ -267,6 +311,7 @@ pub unsafe extern "C" fn lance_namespace_create_empty_table( bearer_token: *const c_char, api_key: *const c_char, delimiter: *const c_char, + headers_tsv: *const c_char, out_location: *mut *const c_char, out_storage_options_tsv: *mut *const c_char, ) -> i32 { @@ -281,7 +326,7 @@ pub unsafe extern "C" fn lance_namespace_create_empty_table( } } - match create_empty_table_inner(endpoint, table_id, bearer_token, api_key, delimiter) { + match create_empty_table_inner(endpoint, table_id, bearer_token, api_key, delimiter, headers_tsv) { Ok((location, storage_options_tsv)) => { clear_last_error(); if !out_location.is_null() { @@ -315,17 +360,24 @@ fn drop_table_inner( bearer_token: *const c_char, api_key: *const c_char, delimiter: *const c_char, + headers_tsv: *const c_char, ) -> FfiResult<()> { let endpoint = unsafe { cstr_to_str(endpoint, "endpoint")? }; let table_id = unsafe { cstr_to_str(table_id, "table_id")? }; let delimiter = unsafe { optional_cstr_to_string(delimiter, "delimiter")? }; let bearer_token = unsafe { optional_cstr_to_string(bearer_token, "bearer_token")? }; let api_key = unsafe { optional_cstr_to_string(api_key, "api_key")? }; + let headers_tsv = unsafe { optional_cstr_to_string(headers_tsv, "headers_tsv")? }; let delimiter = delimiter.unwrap_or_else(|| "$".to_string()); - let namespace = build_config(endpoint, bearer_token.as_deref(), api_key.as_deref()) - .delimiter(delimiter) - .build(); + let namespace = build_config( + endpoint, + bearer_token.as_deref(), + api_key.as_deref(), + headers_tsv.as_deref(), + ) + .delimiter(delimiter) + .build(); runtime::block_on(async move { let mut req = DropTableRequest::new(); @@ -349,8 +401,9 @@ pub unsafe extern "C" fn lance_namespace_drop_table( bearer_token: *const c_char, api_key: *const c_char, delimiter: *const c_char, + headers_tsv: *const c_char, ) -> i32 { - match drop_table_inner(endpoint, table_id, bearer_token, api_key, delimiter) { + match drop_table_inner(endpoint, table_id, bearer_token, api_key, delimiter, headers_tsv) { Ok(()) => { clear_last_error(); 0 @@ -368,17 +421,24 @@ fn open_dataset_in_namespace_inner( bearer_token: *const c_char, api_key: *const c_char, delimiter: *const c_char, + headers_tsv: *const c_char, ) -> FfiResult<(DatasetHandle, String)> { let endpoint = unsafe { cstr_to_str(endpoint, "endpoint")? }; let table_id = unsafe { cstr_to_str(table_id, "table_id")? }; let delimiter = unsafe { optional_cstr_to_string(delimiter, "delimiter")? }; let bearer_token = unsafe { optional_cstr_to_string(bearer_token, "bearer_token")? }; let api_key = unsafe { optional_cstr_to_string(api_key, "api_key")? }; + let headers_tsv = unsafe { optional_cstr_to_string(headers_tsv, "headers_tsv")? }; let delimiter = delimiter.unwrap_or_else(|| "$".to_string()); - let namespace = build_config(endpoint, bearer_token.as_deref(), api_key.as_deref()) - .delimiter(delimiter) - .build(); + let namespace = build_config( + endpoint, + bearer_token.as_deref(), + api_key.as_deref(), + headers_tsv.as_deref(), + ) + .delimiter(delimiter) + .build(); let (dataset, table_uri) = runtime::block_on(async move { let mut req = DescribeTableRequest::new(); @@ -422,6 +482,7 @@ pub unsafe extern "C" fn lance_open_dataset_in_namespace( bearer_token: *const c_char, api_key: *const c_char, delimiter: *const c_char, + headers_tsv: *const c_char, out_table_uri: *mut *const c_char, ) -> *mut c_void { if !out_table_uri.is_null() { @@ -430,7 +491,7 @@ pub unsafe extern "C" fn lance_open_dataset_in_namespace( } } - match open_dataset_in_namespace_inner(endpoint, table_id, bearer_token, api_key, delimiter) { + match open_dataset_in_namespace_inner(endpoint, table_id, bearer_token, api_key, delimiter, headers_tsv) { Ok((handle, table_uri)) => { clear_last_error(); if !out_table_uri.is_null() { diff --git a/src/include/lance_common.hpp b/src/include/lance_common.hpp index b2e7b07..b32453e 100644 --- a/src/include/lance_common.hpp +++ b/src/include/lance_common.hpp @@ -47,6 +47,7 @@ bool TryLanceNamespaceListTables(ClientContext &context, const string &endpoint, const string &namespace_id, const string &bearer_token, const string &api_key, const string &delimiter, + const string &headers_tsv, vector &out_tables, string &out_error); bool TryLanceDirNamespaceListTables(ClientContext &context, const string &root, @@ -57,25 +58,27 @@ void * LanceOpenDatasetInNamespace(ClientContext &context, const string &endpoint, const string &table_id, const string &bearer_token, const string &api_key, const string &delimiter, - string &out_table_uri); + const string &headers_tsv, string &out_table_uri); bool TryLanceNamespaceDescribeTable( ClientContext &context, const string &endpoint, const string &table_id, const string &bearer_token, const string &api_key, const string &delimiter, - string &out_location, vector &out_option_keys, - vector &out_option_values, string &out_error); + const string &headers_tsv, string &out_location, + vector &out_option_keys, vector &out_option_values, + string &out_error); bool TryLanceNamespaceCreateEmptyTable( ClientContext &context, const string &endpoint, const string &table_id, const string &bearer_token, const string &api_key, const string &delimiter, - string &out_location, vector &out_option_keys, - vector &out_option_values, string &out_error); + const string &headers_tsv, string &out_location, + vector &out_option_keys, vector &out_option_values, + string &out_error); bool TryLanceNamespaceDropTable(ClientContext &context, const string &endpoint, const string &table_id, const string &bearer_token, const string &api_key, const string &delimiter, - string &out_error); + const string &headers_tsv, string &out_error); class LanceTableEntry; diff --git a/src/include/lance_ffi.hpp b/src/include/lance_ffi.hpp index 52c91d3..15c5de6 100644 --- a/src/include/lance_ffi.hpp +++ b/src/include/lance_ffi.hpp @@ -22,26 +22,27 @@ int32_t lance_dir_namespace_drop_table(const char *root, const char *table_name, void *lance_open_dataset_in_dir_namespace( const char *root, const char *table_name, const char **option_keys, const char **option_values, size_t options_len, const char **out_table_uri); -const char *lance_namespace_list_tables(const char *endpoint, - const char *namespace_id, - const char *bearer_token, - const char *api_key, - const char *delimiter); -int32_t -lance_namespace_describe_table(const char *endpoint, const char *table_id, - const char *bearer_token, const char *api_key, - const char *delimiter, const char **out_location, - const char **out_storage_options_tsv); +const char * +lance_namespace_list_tables(const char *endpoint, const char *namespace_id, + const char *bearer_token, const char *api_key, + const char *delimiter, const char *headers_tsv); +int32_t lance_namespace_describe_table( + const char *endpoint, const char *table_id, const char *bearer_token, + const char *api_key, const char *delimiter, const char *headers_tsv, + const char **out_location, const char **out_storage_options_tsv); int32_t lance_namespace_create_empty_table( const char *endpoint, const char *table_id, const char *bearer_token, - const char *api_key, const char *delimiter, const char **out_location, - const char **out_storage_options_tsv); + const char *api_key, const char *delimiter, const char *headers_tsv, + const char **out_location, const char **out_storage_options_tsv); int32_t lance_namespace_drop_table(const char *endpoint, const char *table_id, const char *bearer_token, - const char *api_key, const char *delimiter); -void *lance_open_dataset_in_namespace( - const char *endpoint, const char *table_id, const char *bearer_token, - const char *api_key, const char *delimiter, const char **out_table_uri); + const char *api_key, const char *delimiter, + const char *headers_tsv); +void * +lance_open_dataset_in_namespace(const char *endpoint, const char *table_id, + const char *bearer_token, const char *api_key, + const char *delimiter, const char *headers_tsv, + const char **out_table_uri); void lance_close_dataset(void *dataset); void *lance_get_schema(void *dataset); diff --git a/src/include/lance_table_entry.hpp b/src/include/lance_table_entry.hpp index 229824d..bbfa734 100644 --- a/src/include/lance_table_entry.hpp +++ b/src/include/lance_table_entry.hpp @@ -16,6 +16,7 @@ struct LanceNamespaceTableConfig { string delimiter; string bearer_token_override; string api_key_override; + string headers_tsv; }; // LanceTableEntry represents a Lance dataset as a DuckDB base table entry. diff --git a/src/lance_common.cpp b/src/lance_common.cpp index 8585a9f..d7b3423 100644 --- a/src/lance_common.cpp +++ b/src/lance_common.cpp @@ -206,12 +206,10 @@ void BuildStorageOptionPointerArrays(const vector &option_keys, } } -bool TryLanceNamespaceListTables(ClientContext &context, const string &endpoint, - const string &namespace_id, - const string &bearer_token, - const string &api_key, const string &delimiter, - vector &out_tables, - string &out_error) { +bool TryLanceNamespaceListTables( + ClientContext &context, const string &endpoint, const string &namespace_id, + const string &bearer_token, const string &api_key, const string &delimiter, + const string &headers_tsv, vector &out_tables, string &out_error) { out_tables.clear(); out_error.clear(); @@ -219,10 +217,11 @@ bool TryLanceNamespaceListTables(ClientContext &context, const string &endpoint, bearer_token.empty() ? nullptr : bearer_token.c_str(); const char *api_key_ptr = api_key.empty() ? nullptr : api_key.c_str(); const char *delimiter_ptr = delimiter.empty() ? nullptr : delimiter.c_str(); + const char *headers_ptr = headers_tsv.empty() ? nullptr : headers_tsv.c_str(); - auto *ptr = - lance_namespace_list_tables(endpoint.c_str(), namespace_id.c_str(), - bearer_ptr, api_key_ptr, delimiter_ptr); + auto *ptr = lance_namespace_list_tables( + endpoint.c_str(), namespace_id.c_str(), bearer_ptr, api_key_ptr, + delimiter_ptr, headers_ptr); if (!ptr) { out_error = LanceConsumeLastError(); if (out_error.empty()) { @@ -268,8 +267,9 @@ static void ParseStorageOptionsTsv(const char *ptr, vector &out_keys, bool TryLanceNamespaceDescribeTable( ClientContext &context, const string &endpoint, const string &table_id, const string &bearer_token, const string &api_key, const string &delimiter, - string &out_location, vector &out_option_keys, - vector &out_option_values, string &out_error) { + const string &headers_tsv, string &out_location, + vector &out_option_keys, vector &out_option_values, + string &out_error) { (void)context; out_location.clear(); out_option_keys.clear(); @@ -280,12 +280,13 @@ bool TryLanceNamespaceDescribeTable( bearer_token.empty() ? nullptr : bearer_token.c_str(); const char *api_key_ptr = api_key.empty() ? nullptr : api_key.c_str(); const char *delimiter_ptr = delimiter.empty() ? nullptr : delimiter.c_str(); + const char *headers_ptr = headers_tsv.empty() ? nullptr : headers_tsv.c_str(); const char *location_ptr = nullptr; const char *options_ptr = nullptr; auto rc = lance_namespace_describe_table( endpoint.c_str(), table_id.c_str(), bearer_ptr, api_key_ptr, - delimiter_ptr, &location_ptr, &options_ptr); + delimiter_ptr, headers_ptr, &location_ptr, &options_ptr); if (rc != 0) { out_error = LanceConsumeLastError(); if (out_error.empty()) { @@ -304,8 +305,9 @@ bool TryLanceNamespaceDescribeTable( bool TryLanceNamespaceCreateEmptyTable( ClientContext &context, const string &endpoint, const string &table_id, const string &bearer_token, const string &api_key, const string &delimiter, - string &out_location, vector &out_option_keys, - vector &out_option_values, string &out_error) { + const string &headers_tsv, string &out_location, + vector &out_option_keys, vector &out_option_values, + string &out_error) { (void)context; out_location.clear(); out_option_keys.clear(); @@ -316,12 +318,13 @@ bool TryLanceNamespaceCreateEmptyTable( bearer_token.empty() ? nullptr : bearer_token.c_str(); const char *api_key_ptr = api_key.empty() ? nullptr : api_key.c_str(); const char *delimiter_ptr = delimiter.empty() ? nullptr : delimiter.c_str(); + const char *headers_ptr = headers_tsv.empty() ? nullptr : headers_tsv.c_str(); const char *location_ptr = nullptr; const char *options_ptr = nullptr; auto rc = lance_namespace_create_empty_table( endpoint.c_str(), table_id.c_str(), bearer_ptr, api_key_ptr, - delimiter_ptr, &location_ptr, &options_ptr); + delimiter_ptr, headers_ptr, &location_ptr, &options_ptr); if (rc != 0) { out_error = LanceConsumeLastError(); if (out_error.empty()) { @@ -341,7 +344,7 @@ bool TryLanceNamespaceDropTable(ClientContext &context, const string &endpoint, const string &table_id, const string &bearer_token, const string &api_key, const string &delimiter, - string &out_error) { + const string &headers_tsv, string &out_error) { (void)context; out_error.clear(); @@ -349,9 +352,11 @@ bool TryLanceNamespaceDropTable(ClientContext &context, const string &endpoint, bearer_token.empty() ? nullptr : bearer_token.c_str(); const char *api_key_ptr = api_key.empty() ? nullptr : api_key.c_str(); const char *delimiter_ptr = delimiter.empty() ? nullptr : delimiter.c_str(); + const char *headers_ptr = headers_tsv.empty() ? nullptr : headers_tsv.c_str(); - auto rc = lance_namespace_drop_table(endpoint.c_str(), table_id.c_str(), - bearer_ptr, api_key_ptr, delimiter_ptr); + auto rc = + lance_namespace_drop_table(endpoint.c_str(), table_id.c_str(), bearer_ptr, + api_key_ptr, delimiter_ptr, headers_ptr); if (rc != 0) { out_error = LanceConsumeLastError(); if (out_error.empty()) { @@ -406,18 +411,19 @@ void * LanceOpenDatasetInNamespace(ClientContext &context, const string &endpoint, const string &table_id, const string &bearer_token, const string &api_key, const string &delimiter, - string &out_table_uri) { + const string &headers_tsv, string &out_table_uri) { (void)context; out_table_uri.clear(); const char *bearer_ptr = bearer_token.empty() ? nullptr : bearer_token.c_str(); const char *api_key_ptr = api_key.empty() ? nullptr : api_key.c_str(); const char *delimiter_ptr = delimiter.empty() ? nullptr : delimiter.c_str(); + const char *headers_ptr = headers_tsv.empty() ? nullptr : headers_tsv.c_str(); const char *uri_ptr = nullptr; auto *dataset = lance_open_dataset_in_namespace( endpoint.c_str(), table_id.c_str(), bearer_ptr, api_key_ptr, - delimiter_ptr, &uri_ptr); + delimiter_ptr, headers_ptr, &uri_ptr); if (uri_ptr) { out_table_uri = uri_ptr; lance_free_string(uri_ptr); @@ -477,7 +483,7 @@ void *LanceOpenDatasetForTable(ClientContext &context, string table_uri; auto *dataset = LanceOpenDatasetInNamespace( context, cfg.endpoint, cfg.table_id, bearer_token, api_key, cfg.delimiter, - table_uri); + cfg.headers_tsv, table_uri); if (!table_uri.empty()) { out_display_uri = table_uri; } else { @@ -511,9 +517,10 @@ void ResolveLanceStorageOptionsForTable(ClientContext &context, string error; vector option_keys; vector option_values; - if (!TryLanceNamespaceDescribeTable( - context, cfg.endpoint, cfg.table_id, bearer_token, api_key, - cfg.delimiter, location, option_keys, option_values, error)) { + if (!TryLanceNamespaceDescribeTable(context, cfg.endpoint, cfg.table_id, + bearer_token, api_key, cfg.delimiter, + cfg.headers_tsv, location, option_keys, + option_values, error)) { throw IOException("Failed to describe Lance table via namespace: " + (error.empty() ? "unknown error" : error)); } diff --git a/src/lance_scan.cpp b/src/lance_scan.cpp index 0aa7b82..6e3de30 100644 --- a/src/lance_scan.cpp +++ b/src/lance_scan.cpp @@ -846,8 +846,10 @@ LanceNamespaceScanBind(ClientContext &context, TableFunctionBindInput &input, bearer_token, api_key); string table_uri; - result->dataset = LanceOpenDatasetInNamespace( - context, endpoint, table_id, bearer_token, api_key, delimiter, table_uri); + string headers_tsv; // TODO: Add support for headers in table function + result->dataset = + LanceOpenDatasetInNamespace(context, endpoint, table_id, bearer_token, + api_key, delimiter, headers_tsv, table_uri); if (!table_uri.empty()) { result->file_path = table_uri; } diff --git a/src/lance_storage.cpp b/src/lance_storage.cpp index c7fb30c..2b32ce2 100644 --- a/src/lance_storage.cpp +++ b/src/lance_storage.cpp @@ -59,6 +59,7 @@ struct LanceRestNamespaceConfig { string delimiter; string bearer_token_override; string api_key_override; + string headers_tsv; // Tab-separated key\tvalue pairs for custom headers }; static string GetLanceNamespaceEndpoint(const AttachInfo &info) { @@ -88,6 +89,52 @@ static string GetLanceNamespaceDelimiter(const AttachInfo &info) { return ""; } +// Parse HEADER options from ATTACH command +// Options like HEADER 'x-lancedb-database=lance_ns;x-api-key=sk_123' are parsed +// Multiple headers can be separated by semicolons within a single HEADER option +// Returns a TSV string with key\tvalue pairs separated by newlines +static string GetLanceNamespaceHeaders(const AttachInfo &info) { + string headers_tsv; + for (auto &kv : info.options) { + // Handle 'HEADER' option with 'key=value' format + if (StringUtil::CIEquals(kv.first, "header") && !kv.second.IsNull()) { + auto header_str = + kv.second.DefaultCastAs(LogicalType::VARCHAR).GetValue(); + // Split by semicolon to support multiple headers + vector header_parts; + size_t pos = 0; + while (pos < header_str.size()) { + auto next_semi = header_str.find(';', pos); + if (next_semi == string::npos) { + header_parts.push_back(header_str.substr(pos)); + break; + } + header_parts.push_back(header_str.substr(pos, next_semi - pos)); + pos = next_semi + 1; + } + for (auto &part : header_parts) { + // Trim whitespace + while (!part.empty() && isspace(part.front())) { + part.erase(part.begin()); + } + while (!part.empty() && isspace(part.back())) { + part.pop_back(); + } + auto eq_pos = part.find('='); + if (eq_pos != string::npos && eq_pos > 0) { + auto key = part.substr(0, eq_pos); + auto value = part.substr(eq_pos + 1); + if (!headers_tsv.empty()) { + headers_tsv += "\n"; + } + headers_tsv += key + "\t" + value; + } + } + } + } + return headers_tsv; +} + static void PopulateLanceTableColumnsFromDataset(ClientContext &context, void *dataset, ColumnList &out_columns) { @@ -162,19 +209,19 @@ ListDirectoryNamespaceTables(const LanceDirectoryNamespaceConfig &ns) { return out; } -static vector ListRestNamespaceTables(const string &endpoint, - const string &namespace_id, - const string &bearer_token, - const string &api_key, - const string &delimiter) { +static vector +ListRestNamespaceTables(const string &endpoint, const string &namespace_id, + const string &bearer_token, const string &api_key, + const string &delimiter, const string &headers_tsv) { const char *bearer_ptr = bearer_token.empty() ? nullptr : bearer_token.c_str(); const char *api_key_ptr = api_key.empty() ? nullptr : api_key.c_str(); const char *delimiter_ptr = delimiter.empty() ? nullptr : delimiter.c_str(); + const char *headers_ptr = headers_tsv.empty() ? nullptr : headers_tsv.c_str(); - auto *ptr = - lance_namespace_list_tables(endpoint.c_str(), namespace_id.c_str(), - bearer_ptr, api_key_ptr, delimiter_ptr); + auto *ptr = lance_namespace_list_tables( + endpoint.c_str(), namespace_id.c_str(), bearer_ptr, api_key_ptr, + delimiter_ptr, headers_ptr); if (!ptr) { throw IOException("Failed to list tables from Lance namespace: " + endpoint + "/" + namespace_id + LanceFormatErrorSuffix()); @@ -275,13 +322,15 @@ class LanceRestNamespaceDefaultGenerator : public DefaultGenerator { LanceRestNamespaceDefaultGenerator( Catalog &catalog, SchemaCatalogEntry &schema, string endpoint, string namespace_id, string bearer_token, string api_key, - string delimiter, string bearer_token_override, string api_key_override) + string delimiter, string bearer_token_override, string api_key_override, + string headers_tsv) : DefaultGenerator(catalog), schema(schema), endpoint(std::move(endpoint)), namespace_id(std::move(namespace_id)), bearer_token(std::move(bearer_token)), api_key(std::move(api_key)), delimiter(std::move(delimiter)), bearer_token_override(std::move(bearer_token_override)), - api_key_override(std::move(api_key_override)) {} + api_key_override(std::move(api_key_override)), + headers_tsv(std::move(headers_tsv)) {} unique_ptr CreateDefaultEntry(ClientContext &context, @@ -319,12 +368,12 @@ class LanceRestNamespaceDefaultGenerator : public DefaultGenerator { string table_uri; auto *dataset = LanceOpenDatasetInNamespace( context, endpoint, candidate_table_id, resolved_bearer, - resolved_api_key, delimiter, table_uri); + resolved_api_key, delimiter, headers_tsv, table_uri); if (!dataset && !fallback_table_id.empty()) { table_id = fallback_table_id; dataset = LanceOpenDatasetInNamespace(context, endpoint, table_id, resolved_bearer, resolved_api_key, - delimiter, table_uri); + delimiter, headers_tsv, table_uri); } if (!dataset) { return nullptr; @@ -347,13 +396,14 @@ class LanceRestNamespaceDefaultGenerator : public DefaultGenerator { cfg.delimiter = delimiter; cfg.bearer_token_override = bearer_token_override; cfg.api_key_override = api_key_override; + cfg.headers_tsv = headers_tsv; return make_uniq_base(catalog, schema, info, std::move(cfg)); } vector GetDefaultEntries() override { auto tables = ListRestNamespaceTables(endpoint, namespace_id, bearer_token, - api_key, delimiter); + api_key, delimiter, headers_tsv); if (namespace_id.empty()) { return tables; } @@ -376,6 +426,7 @@ class LanceRestNamespaceDefaultGenerator : public DefaultGenerator { string delimiter; string bearer_token_override; string api_key_override; + string headers_tsv; }; static string GetDatasetDirName(const string &table_name) { @@ -527,7 +578,8 @@ class LanceSchemaEntry final : public DuckSchemaEntry { string list_error; if (!TryLanceNamespaceListTables( context, rest_ns->endpoint, rest_ns->namespace_id, bearer_token, - api_key, rest_ns->delimiter, discovered, list_error)) { + api_key, rest_ns->delimiter, rest_ns->headers_tsv, discovered, + list_error)) { throw IOException("Failed to list tables from Lance namespace: " + (list_error.empty() ? "unknown error" : list_error)); } @@ -544,9 +596,9 @@ class LanceSchemaEntry final : public DuckSchemaEntry { } string drop_error; - if (!TryLanceNamespaceDropTable(context, rest_ns->endpoint, - table_id_for_ops, bearer_token, api_key, - rest_ns->delimiter, drop_error)) { + if (!TryLanceNamespaceDropTable( + context, rest_ns->endpoint, table_id_for_ops, bearer_token, + api_key, rest_ns->delimiter, rest_ns->headers_tsv, drop_error)) { throw IOException("Failed to drop Lance table via namespace: " + (drop_error.empty() ? "unknown error" : drop_error)); } @@ -648,7 +700,8 @@ class LanceSchemaEntry final : public DuckSchemaEntry { string list_error; if (!TryLanceNamespaceListTables( context, rest_ns->endpoint, rest_ns->namespace_id, bearer_token, - api_key, rest_ns->delimiter, discovered, list_error)) { + api_key, rest_ns->delimiter, rest_ns->headers_tsv, discovered, + list_error)) { throw IOException("Failed to list tables from Lance namespace: " + (list_error.empty() ? "unknown error" : list_error)); } @@ -681,7 +734,8 @@ class LanceSchemaEntry final : public DuckSchemaEntry { string drop_error; if (!TryLanceNamespaceDropTable(context, rest_ns->endpoint, table_id_for_ops, bearer_token, api_key, - rest_ns->delimiter, drop_error)) { + rest_ns->delimiter, + rest_ns->headers_tsv, drop_error)) { throw IOException( "Failed to drop Lance table via namespace: " + (drop_error.empty() ? "unknown error" : drop_error)); @@ -691,8 +745,8 @@ class LanceSchemaEntry final : public DuckSchemaEntry { string create_error; if (!TryLanceNamespaceCreateEmptyTable( context, rest_ns->endpoint, table_id_for_ops, bearer_token, - api_key, rest_ns->delimiter, dataset_path, option_keys, - option_values, create_error)) { + api_key, rest_ns->delimiter, rest_ns->headers_tsv, dataset_path, + option_keys, option_values, create_error)) { // Best-effort fallback for namespace implementations that do not use // a qualified object identifier for tables in ListTables. if (!prefixed_id.empty() && table_id_for_ops == prefixed_id) { @@ -702,8 +756,8 @@ class LanceSchemaEntry final : public DuckSchemaEntry { create_error.clear(); if (!TryLanceNamespaceCreateEmptyTable( context, rest_ns->endpoint, leaf_id, bearer_token, api_key, - rest_ns->delimiter, dataset_path, option_keys, option_values, - create_error)) { + rest_ns->delimiter, rest_ns->headers_tsv, dataset_path, + option_keys, option_values, create_error)) { throw IOException( "Failed to create Lance table via namespace: " + (create_error.empty() ? "unknown error" : create_error)); @@ -872,8 +926,8 @@ class LanceDuckCatalog final : public DuckCatalog { vector types_p, string endpoint, string namespace_id, string delimiter, string bearer_token_override, - string api_key_override, string table_name, - string writer_mode, + string api_key_override, string headers_tsv, + string table_name, string writer_mode, vector column_names_p, vector column_types_p, idx_t estimated_cardinality) @@ -884,6 +938,7 @@ class LanceDuckCatalog final : public DuckCatalog { delimiter(std::move(delimiter)), bearer_token_override(std::move(bearer_token_override)), api_key_override(std::move(api_key_override)), + headers_tsv(std::move(headers_tsv)), table_name(std::move(table_name)), writer_mode(std::move(writer_mode)), column_names(std::move(column_names_p)), @@ -902,6 +957,7 @@ class LanceDuckCatalog final : public DuckCatalog { string delimiter; string bearer_token_override; string api_key_override; + string headers_tsv; string table_name; string writer_mode; @@ -919,14 +975,16 @@ class LanceDuckCatalog final : public DuckCatalog { explicit GlobalState(string endpoint_p, string namespace_id_p, string delimiter_p, string bearer_override_p, - string api_override_p, string table_name_p, - string writer_mode_p, vector col_names_p, + string api_override_p, string headers_tsv_p, + string table_name_p, string writer_mode_p, + vector col_names_p, vector col_types_p) : endpoint(std::move(endpoint_p)), namespace_id(std::move(namespace_id_p)), delimiter(std::move(delimiter_p)), bearer_token_override(std::move(bearer_override_p)), api_key_override(std::move(api_override_p)), + headers_tsv(std::move(headers_tsv_p)), table_name(std::move(table_name_p)), writer_mode(std::move(writer_mode_p)), column_names(std::move(col_names_p)), @@ -944,8 +1002,8 @@ class LanceDuckCatalog final : public DuckCatalog { GetGlobalSinkState(ClientContext &context) const override { auto state = make_uniq( endpoint, namespace_id, delimiter, bearer_token_override, - api_key_override, table_name, writer_mode, column_names, - column_types); + api_key_override, headers_tsv, table_name, writer_mode, + column_names, column_types); auto props = context.GetClientProperties(); memset(&state->schema_root.arrow_schema, 0, @@ -981,7 +1039,8 @@ class LanceDuckCatalog final : public DuckCatalog { string list_error; if (!TryLanceNamespaceListTables( context, state->endpoint, state->namespace_id, bearer_token, - api_key, state->delimiter, discovered, list_error)) { + api_key, state->delimiter, state->headers_tsv, discovered, + list_error)) { throw IOException( "Failed to list tables from Lance namespace: " + (list_error.empty() ? "unknown error" : list_error)); @@ -1002,9 +1061,10 @@ class LanceDuckCatalog final : public DuckCatalog { // If overwriting, drop any existing table first. if (state->writer_mode == "overwrite") { string drop_error; - if (!TryLanceNamespaceDropTable( - context, state->endpoint, state->table_id, bearer_token, - api_key, state->delimiter, drop_error)) { + if (!TryLanceNamespaceDropTable(context, state->endpoint, + state->table_id, bearer_token, + api_key, state->delimiter, + state->headers_tsv, drop_error)) { throw IOException( "Failed to drop Lance table via namespace: " + (drop_error.empty() ? "unknown error" : drop_error)); @@ -1014,8 +1074,9 @@ class LanceDuckCatalog final : public DuckCatalog { string create_error; if (!TryLanceNamespaceCreateEmptyTable( context, state->endpoint, state->table_id, bearer_token, - api_key, state->delimiter, state->open_path, - state->option_keys, state->option_values, create_error)) { + api_key, state->delimiter, state->headers_tsv, + state->open_path, state->option_keys, state->option_values, + create_error)) { if (!prefixed_id.empty() && state->table_id == prefixed_id) { state->table_id = leaf_id; state->open_path.clear(); @@ -1024,8 +1085,9 @@ class LanceDuckCatalog final : public DuckCatalog { create_error.clear(); if (!TryLanceNamespaceCreateEmptyTable( context, state->endpoint, state->table_id, bearer_token, - api_key, state->delimiter, state->open_path, - state->option_keys, state->option_values, create_error)) { + api_key, state->delimiter, state->headers_tsv, + state->open_path, state->option_keys, + state->option_values, create_error)) { throw IOException( "Failed to create Lance table via namespace: " + (create_error.empty() ? "unknown error" : create_error)); @@ -1157,6 +1219,7 @@ class LanceDuckCatalog final : public DuckCatalog { string delimiter; string bearer_token_override; string api_key_override; + string headers_tsv; string table_name; string writer_mode; vector column_names; @@ -1181,7 +1244,8 @@ class LanceDuckCatalog final : public DuckCatalog { string list_error; if (!TryLanceNamespaceListTables( context, rest_ns->endpoint, rest_ns->namespace_id, bearer_token, - api_key, rest_ns->delimiter, discovered, list_error)) { + api_key, rest_ns->delimiter, rest_ns->headers_tsv, discovered, + list_error)) { throw IOException("Failed to list tables from Lance namespace: " + (list_error.empty() ? "unknown error" : list_error)); } @@ -1227,8 +1291,8 @@ class LanceDuckCatalog final : public DuckCatalog { auto &create_as = planner.Make( op.types, rest_ns->endpoint, rest_ns->namespace_id, rest_ns->delimiter, rest_ns->bearer_token_override, - rest_ns->api_key_override, create_info.table, mode, std::move(names), - std::move(types), op.estimated_cardinality); + rest_ns->api_key_override, rest_ns->headers_tsv, create_info.table, + mode, std::move(names), std::move(types), op.estimated_cardinality); create_as.children.push_back(plan); return create_as; } @@ -1378,10 +1442,19 @@ PhysicalOperator &LanceDuckCatalog::PlanDelete(ClientContext &context, static unique_ptr LanceStorageAttach(optional_ptr, ClientContext &context, AttachedDatabase &db, const string &name, AttachInfo &info, - AttachOptions &) { + AttachOptions &attach_options) { + // Consume Lance-specific options from attach_options.options so that + // DuckDB doesn't complain about unrecognized options when creating storage. + attach_options.options.erase("endpoint"); + attach_options.options.erase("delimiter"); + attach_options.options.erase("header"); + attach_options.options.erase("bearer_token"); + attach_options.options.erase("api_key"); + auto attach_path = info.path; auto endpoint = GetLanceNamespaceEndpoint(info); auto delimiter = GetLanceNamespaceDelimiter(info); + auto headers_tsv = GetLanceNamespaceHeaders(info); unique_ptr generator; shared_ptr directory_ns; @@ -1428,9 +1501,9 @@ LanceStorageAttach(optional_ptr, ClientContext &context, string list_error; vector discovered_tables; // Validate the namespace during ATTACH. - if (!TryLanceNamespaceListTables(context, endpoint, namespace_id, - bearer_token, api_key, delimiter, - discovered_tables, list_error)) { + if (!TryLanceNamespaceListTables( + context, endpoint, namespace_id, bearer_token, api_key, delimiter, + headers_tsv, discovered_tables, list_error)) { throw IOException("Failed to list tables from Lance namespace: " + list_error); } @@ -1441,6 +1514,7 @@ LanceStorageAttach(optional_ptr, ClientContext &context, rest_ns->delimiter = delimiter; rest_ns->bearer_token_override = bearer_token_override; rest_ns->api_key_override = api_key_override; + rest_ns->headers_tsv = headers_tsv; } // Back the attached catalog by an in-memory DuckCatalog that lazily @@ -1464,7 +1538,8 @@ LanceStorageAttach(optional_ptr, ClientContext &context, } else { generator = make_uniq( *catalog, schema, endpoint, namespace_id, std::move(bearer_token), - std::move(api_key), delimiter, bearer_token_override, api_key_override); + std::move(api_key), delimiter, bearer_token_override, api_key_override, + headers_tsv); } catalog_set.SetDefaultGenerator(std::move(generator));